Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test_and_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ jobs:
- name: "Submit coverage to Coveralls"
if: "!contains(github.event.head_commit.message, '[skip test]')"
env:
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
source "$HOME"/.poetry/env
poetry run coveralls
poetry run coveralls --service=github
- name: "Build doc / Asciicast bundle and push out Docker engine (tags only)"
if: "startsWith(github.ref, 'refs/tags/')"
env:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/ambv/black
rev: '20.8b1'
rev: '21.5b1'
hooks:
- id: black
language_version: python3.8
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v0.2.14 (2021-05-05)

* Functionality to dump and load a Splitgraph catalog to/from a special `repositories.yml` format (https://github.com/splitgraph/splitgraph/pull/445)

Full set of changes: [`v0.2.13...v0.2.14`](https://github.com/splitgraph/splitgraph/compare/v0.2.13...v0.2.14)

## v0.2.13 (2021-04-14)

* Various fixes to CSV inference and querying (https://github.com/splitgraph/splitgraph/pull/433)
Expand Down
233 changes: 118 additions & 115 deletions poetry.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions splitgraph/hooks/data_source/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ def __init__(
def introspect(self) -> IntrospectionResult:
raise NotImplementedError

def get_raw_url(
self, tables: Optional[TableInfo] = None, expiry: int = 3600
) -> Dict[str, List[Tuple[str, str]]]:
"""
Get a list of public URLs for each table in this data source, e.g. to export the data
as CSV. These may be temporary (e.g. pre-signed S3 URLs) but should be accessible without
authentication.
:param tables: A TableInfo object overriding the table params of the source
:param expiry: The URL should be valid for at least this many seconds
:return: Dict of table_name -> list of (mimetype, raw URL)
"""
return {}


class MountableDataSource(DataSource, ABC):
supports_mount = True
Expand Down
43 changes: 43 additions & 0 deletions splitgraph/ingestion/csv/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import timedelta

import json
from copy import deepcopy
from typing import Optional, TYPE_CHECKING, Dict, List, Tuple, Any
Expand Down Expand Up @@ -265,3 +267,44 @@ def get_server_options(self):
def get_remote_schema_name(self) -> str:
# We ignore the schema name and use the bucket/prefix passed in the params instead.
return "data"

@staticmethod
def _get_url(merged_options: Dict[str, Any], expiry: int = 3600) -> List[Tuple[str, str]]:
from splitgraph.ingestion.csv.common import get_s3_params

if merged_options.get("url"):
# Currently all URLs are public anyway, so return it directly
return [("text/csv", merged_options["url"])]
else:
# Instantiate the Minio client and pre-sign a URL
s3_client, s3_bucket, s3_object_prefix = get_s3_params(merged_options)
s3_object = merged_options.get("s3_object")
if not s3_object:
return []
return [
(
"text/csv",
s3_client.presigned_get_object(
bucket_name=s3_bucket,
object_name=s3_object,
expires=timedelta(seconds=expiry),
),
)
]

def get_raw_url(
self, tables: Optional[TableInfo] = None, expiry: int = 3600
) -> Dict[str, List[Tuple[str, str]]]:
tables = tables or self.tables
if not tables:
return {}
result: Dict[str, List[Tuple[str, str]]] = {}

# Merge the table options to take care of overrides and use them to get URLs
# for each table.
for table in tables:
table_options = super().get_table_options(table, tables)
full_options = {**self.credentials, **self.params, **table_options}
result[table] = self._get_url(full_options)

return result
19 changes: 19 additions & 0 deletions splitgraph/ingestion/csv/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import csv
import io
from minio import Minio
from typing import Dict, Tuple, NamedTuple, TYPE_CHECKING, Any

if TYPE_CHECKING:
Expand Down Expand Up @@ -131,3 +132,21 @@ def make_csv_reader(

reader = csv.reader(io_stream, **csv_options.to_csv_kwargs())
return csv_options, reader


def get_s3_params(fdw_options: Dict[str, Any]) -> Tuple[Minio, str, str]:
s3_client = Minio(
endpoint=fdw_options["s3_endpoint"],
access_key=fdw_options.get("s3_access_key"),
secret_key=fdw_options.get("s3_secret_key"),
secure=get_bool(fdw_options, "s3_secure"),
region=fdw_options.get("s3_region"),
)

s3_bucket = fdw_options["s3_bucket"]

# We split the object into a prefix + object ID to let us mount a bunch of objects
# with the same prefix as CSV files.
s3_object_prefix = fdw_options.get("s3_object_prefix", "")

return s3_client, s3_bucket, s3_object_prefix
26 changes: 4 additions & 22 deletions splitgraph/ingestion/csv/fdw.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import splitgraph.config
from splitgraph.exceptions import get_exception_name
from splitgraph.ingestion.common import generate_column_names
from splitgraph.ingestion.csv.common import CSVOptions, get_bool, make_csv_reader
from splitgraph.ingestion.csv.common import CSVOptions, get_bool, make_csv_reader, get_s3_params
from splitgraph.ingestion.inference import infer_sg_schema

try:
Expand Down Expand Up @@ -190,7 +190,7 @@ def import_schema(cls, schema, srv_options, options, restriction_type, restricts
return []
else:
# Get S3 options
client, bucket, prefix = cls._get_s3_params(srv_options)
client, bucket, prefix = get_s3_params(srv_options)

# Note that we ignore the "schema" here (e.g. IMPORT FOREIGN SCHEMA some_schema)
# and take all interesting parameters through FDW options.
Expand Down Expand Up @@ -225,7 +225,7 @@ def import_schema(cls, schema, srv_options, options, restriction_type, restricts
if "s3_object" in this_table_options:
# TODO: we can support overriding S3 params per-table here, but currently
# we don't do it.
client, bucket, _ = cls._get_s3_params(srv_options)
client, bucket, _ = get_s3_params(srv_options)
result.append(
cls._introspect_s3(
client,
Expand Down Expand Up @@ -284,24 +284,6 @@ def _introspect_url(
stream = gzip.GzipFile(fileobj=stream)
return _get_table_definition(stream, srv_options, table_name, table_options)

@classmethod
def _get_s3_params(cls, fdw_options) -> Tuple[Minio, str, str]:
s3_client = Minio(
endpoint=fdw_options["s3_endpoint"],
access_key=fdw_options.get("s3_access_key"),
secret_key=fdw_options.get("s3_secret_key"),
secure=get_bool(fdw_options, "s3_secure"),
region=fdw_options.get("s3_region"),
)

s3_bucket = fdw_options["s3_bucket"]

# We split the object into a prefix + object ID to let us mount a bunch of objects
# with the same prefix as CSV files.
s3_object_prefix = fdw_options.get("s3_object_prefix", "")

return s3_client, s3_bucket, s3_object_prefix

def __init__(self, fdw_options, fdw_columns):
# Initialize the logger that will log to the engine's stderr: log timestamp and PID.

Expand All @@ -324,6 +306,6 @@ def __init__(self, fdw_options, fdw_columns):
self.url = fdw_options["url"]
else:
self.mode = "s3"
self.s3_client, self.s3_bucket, self.s3_object_prefix = self._get_s3_params(fdw_options)
self.s3_client, self.s3_bucket, self.s3_object_prefix = get_s3_params(fdw_options)

self.s3_object = fdw_options["s3_object"]
27 changes: 26 additions & 1 deletion splitgraph/ingestion/socrata/mount.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
from copy import deepcopy
from typing import Optional, Dict, List
from typing import Optional, Dict, List, Tuple

from psycopg2.sql import SQL, Identifier

Expand Down Expand Up @@ -116,6 +116,31 @@ def _create_foreign_tables(
self.engine.run_sql(SQL(";").join(mount_statements), mount_args)
return []

def get_raw_url(
self, tables: Optional[TableInfo] = None, expiry: int = 3600
) -> Dict[str, List[Tuple[str, str]]]:
tables = tables or self.tables
if not tables:
return {}
result: Dict[str, List[Tuple[str, str]]] = {}

for table in tables:
table_options = super().get_table_options(table, tables)
full_options = {**self.params, **table_options}
domain = full_options.get("domain")
socrata_id = full_options.get("socrata_id")
if not domain or not socrata_id:
continue

result[table] = [
(
"text/csv",
f"https://{domain}/api/views/{socrata_id}/rows.csv?accessType=DOWNLOAD",
)
]

return result


def generate_socrata_mount_queries(sought_ids, datasets, mountpoint, server_id, tables: TableInfo):
# Local imports since this module gets run from commandline entrypoint on startup.
Expand Down
61 changes: 61 additions & 0 deletions test/splitgraph/ingestion/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,67 @@ def test_csv_data_source_multiple(local_engine_empty):
local_engine_empty.delete_schema("temp_data")


def test_csv_data_source_raw_url(local_engine_empty):
# Use the data from the previous test to test out the raw URL functionality
url = MINIO.presigned_get_object("test_csv", "some_prefix/rdu-weather-history.csv")

credentials = {
"s3_access_key": "minioclient",
"s3_secret_key": "supersecure",
}

params = {
"s3_endpoint": "objectstorage:9000",
"s3_secure": False,
"s3_bucket": "test_csv",
# Put this delimiter in as a canary to make sure table params override server params.
"delimiter": ",",
}

tables = {
"from_url": ([], {"url": url}),
"from_s3_rdu": ([], {"s3_object": "some_prefix/rdu-weather-history.csv"}),
"from_s3_encoding": ([], {"s3_object": "some_prefix/encoding-win-1252.csv"}),
"from_url_broken": ([], {"url": "invalid_url"}),
"from_s3_broken": ([], {"s3_object": "invalid_object"}),
}

source = CSVDataSource(
local_engine_empty,
credentials,
params,
tables,
)

schema = source.introspect()
schema = unwrap(schema)[0]

raw_urls = source.get_raw_url(tables=schema)
assert raw_urls == {
"from_s3_encoding": [
(
"text/csv",
mock.ANY,
)
],
"from_s3_rdu": [
(
"text/csv",
mock.ANY,
)
],
"from_url": [
(
"text/csv",
url,
)
],
}

assert "objectstorage:9000" in raw_urls["from_s3_encoding"][0][1]
assert "objectstorage:9000" in raw_urls["from_s3_rdu"][0][1]


def test_csv_data_source_http(local_engine_empty):
source = CSVDataSource(
local_engine_empty,
Expand Down
34 changes: 31 additions & 3 deletions test/splitgraph/ingestion/test_socrata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
import os
from typing import NamedTuple
from unittest import mock
from unittest.mock import MagicMock, call
from unittest.mock import MagicMock, call, Mock

import pytest
from sodapy import Socrata

from splitgraph.ingestion.socrata.mount import SocrataDataSource
from test.splitgraph.conftest import INGESTION_RESOURCES

from splitgraph.core.types import TableColumn
Expand Down Expand Up @@ -202,7 +204,12 @@ def test_socrata_mounting_slug(local_engine_empty):
socrata.datasets.return_value = socrata_meta
with mock.patch("sodapy.Socrata", return_value=socrata):
mount(
"test/pg_mount", "socrata", {"domain": "example.com", "app_token": "some_token",},
"test/pg_mount",
"socrata",
{
"domain": "example.com",
"app_token": "some_token",
},
)

assert local_engine_empty.get_all_tables("test/pg_mount") == [
Expand Down Expand Up @@ -362,9 +369,30 @@ def test_socrata_smoke(domain, dataset_id, local_engine_empty):
# to make sure the mounting works end-to-end.
try:
mount(
"socrata_mount", "socrata", {"domain": domain, "tables": {"data": dataset_id}},
"socrata_mount",
"socrata",
{"domain": domain, "tables": {"data": dataset_id}},
)
result = local_engine_empty.run_sql("SELECT * FROM socrata_mount.data LIMIT 10")
assert len(result) == 10
finally:
local_engine_empty.delete_schema("socrata_mount")


def test_socrata_data_source_raw_url():
engine = Mock()
data_source = SocrataDataSource(
engine=engine,
params={"domain": "data.healthcare.gov"},
tables={"dataset": ([], {"socrata_id": "7h6f-vws8"})},
credentials={},
)

assert data_source.get_raw_url() == {
"dataset": [
(
"text/csv",
"https://data.healthcare.gov/api/views/7h6f-vws8/rows.csv?accessType=DOWNLOAD",
)
]
}