Skip to content

Commit

Permalink
Add support for incremental loads to FDW plugins
Browse files Browse the repository at this point in the history
Add a `cursor_columns` field to the table parameters (used as a list
of columns that form an increasing-only replication bookmark). Store the
ingestion state in a table inside of the image, similarly to Airbyte.
  • Loading branch information
mildbyte committed Mar 9, 2022
1 parent 74a3ec8 commit 0bd3a89
Show file tree
Hide file tree
Showing 9 changed files with 501 additions and 102 deletions.
45 changes: 33 additions & 12 deletions splitgraph/engine/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import itertools
import re
import threading
from abc import ABC
from contextlib import contextmanager
from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union, cast
from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union, cast

from psycopg2.sql import SQL, Composed, Identifier

Expand Down Expand Up @@ -134,22 +135,42 @@ def copy_table(
target_schema: str,
target_table: str,
with_pk_constraints: bool = True,
cursor_fields: Optional[Dict[str, str]] = None,
) -> None:
"""Copy a table in the same engine, optionally applying primary key constraints as well."""

if cursor_fields:
where_clause = (
SQL(" WHERE (")
+ SQL(",").join(map(Identifier, cursor_fields))
+ SQL(") > (")
+ SQL(",").join(itertools.repeat(SQL("%s"), len(cursor_fields)))
+ SQL(")")
)
query_args = list(cursor_fields.values())
else:
where_clause = SQL("")
query_args = []

if not self.table_exists(target_schema, target_table):
query = SQL("CREATE TABLE {}.{} AS SELECT * FROM {}.{}").format(
Identifier(target_schema),
Identifier(target_table),
Identifier(source_schema),
Identifier(source_table),
query = (
SQL("CREATE TABLE {}.{} AS SELECT * FROM {}.{}").format(
Identifier(target_schema),
Identifier(target_table),
Identifier(source_schema),
Identifier(source_table),
)
+ where_clause
)
else:
query = SQL("INSERT INTO {}.{} SELECT * FROM {}.{}").format(
Identifier(target_schema),
Identifier(target_table),
Identifier(source_schema),
Identifier(source_table),
query = (
SQL("INSERT INTO {}.{} SELECT * FROM {}.{}").format(
Identifier(target_schema),
Identifier(target_table),
Identifier(source_schema),
Identifier(source_table),
)
+ where_clause
)
pks = self.get_primary_keys(source_schema, source_table)

Expand All @@ -161,7 +182,7 @@ def copy_table(
+ SQL(",").join(SQL("{}").format(Identifier(c)) for c, _ in pks)
+ SQL(")")
)
self.run_sql(query)
self.run_sql(query, query_args)

def delete_table(self, schema: str, table: str) -> None:
"""Drop a table from a schema if it exists"""
Expand Down
12 changes: 1 addition & 11 deletions splitgraph/hooks/data_source/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
import os
import types
from copy import deepcopy
from importlib import import_module
from typing import Any, Dict, List, Optional, Type, cast
from typing import Dict, List, Optional, Type, cast

from ...config import CONFIG
from ...config.config import get_all_in_section, get_singleton
Expand Down Expand Up @@ -151,12 +150,3 @@ def _load_source(source_name, source_class_name):
get_singleton(CONFIG, "SG_CONFIG_FILE"),
)
return data_source


def merge_jsonschema(left: Dict[str, Any], right: Dict[str, Any]) -> Dict[str, Any]:
result = deepcopy(left)
result["properties"] = {**result["properties"], **right.get("properties", {})}
result["required"] = result.get("required", []) + [
r for r in right.get("required", []) if r not in result.get("required", [])
]
return result
Loading

0 comments on commit 0bd3a89

Please sign in to comment.