Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e210ea7
Move some common Singer ingestion routines into separate functions th…
mildbyte Jul 12, 2021
60227bc
Allow `copy_to_container` to take in the actual data rather than just…
mildbyte Jul 12, 2021
4e97d12
Fix circular import: move types into `_utils` and rename `_utils` int…
mildbyte Jul 12, 2021
03f52b8
Allow empty data.
mildbyte Jul 12, 2021
1c6c8da
Make `Image._lq_checkout` public, allow it to take in a list of table…
mildbyte Jul 14, 2021
e16fdc2
Initial implementation of an Airbyte <> Splitgraph data source shim
mildbyte Jul 19, 2021
5be1cc2
Add Airbyte to the requirements as an extra and install it in tests.
mildbyte Jul 19, 2021
a6039c6
Break Docker network mode / host detection into a separate routine an…
mildbyte Jul 19, 2021
7f80065
Airbyte log parsing fixes: decode the logs in case of an error and em…
mildbyte Jul 19, 2021
7a7725c
Add initial test suite for Airbyte (introspection/catalog manip)
mildbyte Jul 19, 2021
514aa84
Use two different configurations for the source and the destination (…
mildbyte Jul 19, 2021
e82f596
Always stub out the namespace in the messages between the source and …
mildbyte Jul 19, 2021
7604b6c
Use a custom image comment for Airbyte-generated images.
mildbyte Jul 19, 2021
dfe0128
Misc fixes to incremental loads and errors.
mildbyte Jul 19, 2021
15598a2
Add more unit and end-to-end tests for Airbyte loads (uses the MySQL …
mildbyte Jul 19, 2021
a15d7af
Add missing `__init__.py` to Singer
mildbyte Jul 19, 2021
439987c
Delete airbyte-cdk from the deps and instead copy the Pydantic models…
mildbyte Jul 19, 2021
d9c4dd8
Fix imports in tests and remove the pytest import guard (don't need `…
mildbyte Jul 19, 2021
43bf9f9
Add `chardet` as a dependency (went away after `requests` was upgraded?)
mildbyte Jul 19, 2021
2511981
[CU-15xp9xt] Delete the `airbyte` extra from the installation script …
mildbyte Jul 19, 2021
578c2f9
Use the Airbyte message reader iterator in introspection to avoid iss…
mildbyte Jul 20, 2021
5302bef
Move Docker utilities out of `splitgraph.commandline` to avoid circul…
mildbyte Jul 20, 2021
9fefa06
Merge branch 'master' into feature/airbyte-data-source-CU-15xp9xt
mildbyte Jul 20, 2021
2ef8567
Factor the `DEFAULT_CHUNK_SIZE` out into a variable and use it throug…
mildbyte Jul 21, 2021
62d4e71
Allow overriding the Docker environment in Airbyte containers.
mildbyte Jul 21, 2021
0b091f9
Add support for overriding the cursor field / PK in Airbyte-backed da…
mildbyte Jul 21, 2021
6cd734a
Fix table param override flow in Singer too.
mildbyte Jul 21, 2021
ed4d422
Add the PK/cursor to the Airbyte stream even when in load mode.
mildbyte Jul 21, 2021
55bdb38
Change `table_schema_params_to_dict` to support non-string parameters…
mildbyte Jul 21, 2021
cb1b295
Validate table params against the JSONSchema if they're passed into a…
mildbyte Jul 21, 2021
8e5c387
Fix a test (we set the cursor field from the connector's default in l…
mildbyte Jul 21, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 116 additions & 63 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pyyaml = ">=5.1"
jsonschema = ">=3.1.0"
cryptography = ">=3.4.0"
pydantic = ">=1.8.1"
chardet = "^4.0.0"

# Socrata dataset mounting.
# This could be optional but it's very lightweight (only requires requests).
Expand Down
45 changes: 2 additions & 43 deletions splitgraph/commandline/engine.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import logging
import os
import platform
import time
from io import BytesIO
from pathlib import Path, PureWindowsPath
from tarfile import TarFile, TarInfo
from typing import Dict, TYPE_CHECKING
from urllib.parse import urlparse

Expand All @@ -14,53 +11,15 @@
from splitgraph.__version__ import __version__
from splitgraph.config import CONFIG, SG_CMD_ASCII
from splitgraph.exceptions import DockerUnavailableError, EngineSetupError
from splitgraph.utils.docker import get_docker_client, copy_to_container

if TYPE_CHECKING:
from docker.models.containers import Container
pass


DEFAULT_ENGINE = "default"


def get_docker_client():
"""Wrapper around client.from_env() that also pings the daemon
to make sure it can connect and if not, raises an error."""
import docker

try:
client = docker.from_env()
client.ping()
return client
except Exception as e:
raise DockerUnavailableError("Could not connect to the Docker daemon") from e


def copy_to_container(container: "Container", source_path: str, target_path: str) -> None:
"""
Copy a file into a Docker container

:param container: Container object
:param source_path: Source file path
:param target_path: Target file path (in the container)
:return:
"""
# https://github.com/docker/docker-py/issues/1771
with open(source_path, "rb") as f:
data = f.read()

tarinfo = TarInfo(name=os.path.basename(target_path))
tarinfo.size = len(data)
tarinfo.mtime = int(time.time())

stream = BytesIO()
tar = TarFile(fileobj=stream, mode="w")
tar.addfile(tarinfo, BytesIO(data))
tar.close()

stream.seek(0)
container.put_archive(path=os.path.dirname(target_path), data=stream.read())


def patch_and_save_config(config, patch):
from splitgraph.config.config import patch_config
from splitgraph.config.system_config import HOME_SUB_DIR
Expand Down
4 changes: 2 additions & 2 deletions splitgraph/commandline/image_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import click

from splitgraph.commandline.common import ImageType, RepositoryType, JsonType, remote_switch_option
from splitgraph.config import get_singleton, CONFIG
from splitgraph.config import DEFAULT_CHUNK_SIZE
from splitgraph.exceptions import TableNotFoundError


Expand Down Expand Up @@ -72,7 +72,7 @@ def checkout_c(image_spec, force, uncheckout, layered):
@click.option(
"-c",
"--chunk-size",
default=int(get_singleton(CONFIG, "SG_COMMIT_CHUNK_SIZE")),
default=DEFAULT_CHUNK_SIZE,
type=int,
help="Split new tables into chunks of this many rows (by primary key). The default "
"value is governed by the SG_COMMIT_CHUNK_SIZE configuration parameter.",
Expand Down
2 changes: 2 additions & 0 deletions splitgraph/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

SG_CMD_ASCII = get_singleton(CONFIG, "SG_CMD_ASCII") == "true"

DEFAULT_CHUNK_SIZE = int(get_singleton(CONFIG, "SG_COMMIT_CHUNK_SIZE"))

REMOTES = list(CONFIG.get("remotes", []))

# This is a global variable that gets flipped to True by the Multicorn FDW class
Expand Down
14 changes: 10 additions & 4 deletions splitgraph/core/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,17 @@ def checkout(self, force: bool = False, layered: bool = False) -> None:
self.object_engine.delete_table(target_schema, table)

if layered:
self._lq_checkout()
self.lq_checkout()
else:
for table in self.get_tables():
self.get_table(table).materialize(table)
set_head(self.repository, self.image_hash)

def _lq_checkout(
self, target_schema: Optional[str] = None, wrapper: Optional[str] = FDW_CLASS
def lq_checkout(
self,
target_schema: Optional[str] = None,
wrapper: Optional[str] = FDW_CLASS,
only_tables: Optional[List[str]] = None,
) -> None:
"""
Intended to be run on the sgr side. Initializes the FDW for all tables in a given image,
Expand Down Expand Up @@ -198,6 +201,9 @@ def _lq_checkout(

# It's easier to create the foreign tables from our side than to implement IMPORT FOREIGN SCHEMA by the FDW
for table_name in self.get_tables():
if only_tables and table_name not in only_tables:
continue

logging.debug(
"Mounting %s:%s/%s into %s",
self.repository.to_schema(),
Expand All @@ -220,7 +226,7 @@ def query_schema(
tmp_schema = str.format("o{:032x}", getrandbits(128))
try:
self.object_engine.create_schema(tmp_schema)
self._lq_checkout(target_schema=tmp_schema, wrapper=wrapper)
self.lq_checkout(target_schema=tmp_schema, wrapper=wrapper)
if commit:
self.object_engine.commit() # Make sure the new tables are seen by other connections

Expand Down
10 changes: 8 additions & 2 deletions splitgraph/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ def unwrap(
return good, bad


def get_table_params(table_info: TableInfo, table_name: str) -> TableParams:
if isinstance(table_info, dict) and table_name in table_info:
return table_info[table_name][1]
return TableParams({})


class Comparable(metaclass=ABCMeta):
@abstractmethod
def __lt__(self, other: Any) -> bool:
Expand All @@ -91,11 +97,11 @@ def dict_to_table_schema_params(

def table_schema_params_to_dict(
tables: Dict[str, Tuple[TableSchema, TableParams]]
) -> Dict[str, Dict[str, Dict[str, str]]]:
) -> Dict[str, Dict[str, Dict[str, Any]]]:
return {
t: {
"schema": {c.name: c.pg_type for c in ts},
"options": {tpk: str(tpv) for tpk, tpv in tp.items()},
"options": tp,
}
for t, (ts, tp) in tables.items()
}
20 changes: 14 additions & 6 deletions splitgraph/hooks/data_source/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from psycopg2._json import Json
from psycopg2.sql import SQL, Identifier

from splitgraph.config import DEFAULT_CHUNK_SIZE
from splitgraph.core.engine import repository_exists
from splitgraph.core.image import Image
from splitgraph.core.types import (
Expand Down Expand Up @@ -68,11 +69,16 @@ def __init__(
self.credentials = credentials
self.params = params

self._validate_table_params(tables)
self.tables = tables

@classmethod
def _validate_table_params(cls, tables: Optional[TableInfo]) -> None:
import jsonschema

if isinstance(tables, dict):
for _, table_params in tables.values():
jsonschema.validate(instance=table_params, schema=self.table_params_schema)

self.tables = tables
jsonschema.validate(instance=table_params, schema=cls.table_params_schema)

@abstractmethod
def introspect(self) -> IntrospectionResult:
Expand Down Expand Up @@ -114,6 +120,7 @@ def _load(self, schema: str, tables: Optional[TableInfo] = None):
raise NotImplementedError

def load(self, repository: "Repository", tables: Optional[TableInfo] = None) -> str:
self._validate_table_params(tables)
if not repository_exists(repository):
repository.init()

Expand All @@ -132,7 +139,7 @@ def load(self, repository: "Repository", tables: Optional[TableInfo] = None) ->
head=None,
image_hash=image_hash,
snap_only=True,
chunk_size=100000,
chunk_size=DEFAULT_CHUNK_SIZE,
schema=tmp_schema,
)
finally:
Expand All @@ -159,6 +166,7 @@ def sync(
image_hash: Optional[str],
tables: Optional[TableInfo] = None,
) -> str:
self._validate_table_params(tables)
if not repository_exists(repository):
repository.init()

Expand Down Expand Up @@ -213,7 +221,7 @@ def get_ingestion_state(repository: "Repository", image_hash: Optional[str]) ->


def prepare_new_image(
repository: "Repository", hash_or_tag: Optional[str]
repository: "Repository", hash_or_tag: Optional[str], comment: str = "Singer tap ingestion"
) -> Tuple[Optional[Image], str]:
new_image_hash = "{:064x}".format(getrandbits(256))
if repository_exists(repository):
Expand All @@ -235,5 +243,5 @@ def prepare_new_image(
)
else:
base_image = None
repository.images.add(parent_id=None, image=new_image_hash, comment="Singer tap ingestion")
repository.images.add(parent_id=None, image=new_image_hash, comment=comment)
return base_image, new_image_hash
1 change: 1 addition & 0 deletions splitgraph/hooks/data_source/fdw.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def mount(
tables: Optional[TableInfo] = None,
overwrite: bool = True,
) -> Optional[List[MountError]]:
self._validate_table_params(tables)
tables = tables or self.tables or []

fdw = self.get_fdw_name()
Expand Down
Empty file.
Loading