Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions splitgraph/core/fragment_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,13 +754,12 @@ def create_base_fragment(
if source_schema == "pg_temp" and not table_schema:
raise ValueError("Cannot infer the schema of temporary tables, pass in table_schema!")

# Get schema (apart from the chunk ID column)
# Fragments can't be reused in tables with different schemas
# even if the contents match (e.g. '1' vs 1). Hence, include the table schema
# n the object ID as well.
table_schema = table_schema or [
c for c in self.object_engine.get_full_table_schema(source_schema, source_table)
]
# in the object ID as well.
table_schema = table_schema or self.object_engine.get_full_table_schema(
source_schema, source_table
)

schema_hash = self._calculate_schema_hash(table_schema)
# Get content hash for this chunk.
Expand Down Expand Up @@ -864,6 +863,7 @@ def record_table_as_base(
extra_indexes: Optional[ExtraIndexInfo] = None,
in_fragment_order: Optional[List[str]] = None,
overwrite: bool = False,
table_schema: Optional[TableSchema] = None,
) -> List[str]:
"""
Copies the full table verbatim into one or more new base fragments and registers them.
Expand All @@ -877,6 +877,9 @@ def record_table_as_base(
:param extra_indexes: Dictionary of {index_type: column: index_specific_kwargs}.
:param in_fragment_order: Key to sort data inside each chunk by.
:param overwrite: Overwrite physical objects that already exist.
:param table_schema: Override the columns that will be picked from the original table
(e.g. to change their order or primary keys). Note that the schema must be a subset
of the original schema and this method doesn't verify PK constraints.
"""
source_schema = source_schema or repository.to_schema()
source_table = source_table or table_name
Expand All @@ -888,7 +891,9 @@ def record_table_as_base(
return_shape=ResultShape.ONE_ONE,
)

table_schema = self.object_engine.get_full_table_schema(source_schema, source_table)
table_schema = table_schema or self.object_engine.get_full_table_schema(
source_schema, source_table
)
if chunk_size and table_not_empty:
object_ids = self._chunk_table(
repository,
Expand All @@ -898,6 +903,7 @@ def record_table_as_base(
extra_indexes,
in_fragment_order=in_fragment_order,
overwrite=overwrite,
table_schema=table_schema,
)

elif table_not_empty:
Expand Down
54 changes: 27 additions & 27 deletions splitgraph/ingestion/airbyte/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
from typing import Dict, Any, Iterable, Generator, Optional, List, Tuple

from psycopg2.sql import SQL, Identifier
from target_postgres.db_sync import column_type

from splitgraph.config import DEFAULT_CHUNK_SIZE
Expand All @@ -21,6 +20,21 @@
AirbyteConfig = Dict[str, Any]
AIRBYTE_RAW = "_airbyte_raw"

# By default, the PK on the Airbyte table is _airbyte_ab_id which is a UUID. We also
# cluster by the PK, which means it will be used to detect overlaps and we'll be sorting
# by it, which is bad as currently it forces Splitgraph to materialize a table into PG
# (since it thinks parts of the table overwrite each other). Instead, as a semi-hack,
# we change the table's schema to set the PK as (_airbyte_emitted_at, _airbyte_ab_id)
# which will make it not overlap (as each new chunk will have a different
# _airbyte_emitted_at).
AIRBYTE_RAW_SCHEMA = [
TableColumn(
ordinal=1, name="_airbyte_emitted_at", pg_type="timestamp with time zone", is_pk=True
),
TableColumn(ordinal=2, name="_airbyte_ab_id", pg_type="character varying", is_pk=True),
TableColumn(ordinal=3, name="_airbyte_data", pg_type="jsonb", is_pk=False),
]


def _airbyte_message_reader(
stream: Iterable[bytes],
Expand Down Expand Up @@ -61,42 +75,25 @@ def _store_raw_airbyte_tables(
if not sync_mode:
logging.warning(
"Couldn't detect the sync mode for %s, falling back to %s",
raw_table,
default_sync_mode,
)
sync_mode = default_sync_mode

# By default, the PK on the Airbyte table is _airbyte_ab_id which is a UUID. We also
# cluster by the PK, which means it will be used to detect overlaps and we'll be sorting
# by it, which is bad as currently it forces Splitgraph to materialize a table into PG
# (since it thinks parts of the table overwrite each other). Instead, as a semi-hack,
# we change the table's schema to set the PK as (_airbyte_emitted_at, _airbyte_ab_id)
# which will make it not overlap (as each new chunk will have a different
# _airbyte_emitted_at).

logging.info("Changing the table's primary key...")
repository.object_engine.run_sql(
SQL(
"ALTER TABLE {0}.{1} DROP CONSTRAINT {2}; "
"ALTER TABLE {0}.{1} ADD PRIMARY KEY (_airbyte_emitted_at, _airbyte_ab_id)"
).format(
Identifier(staging_schema), Identifier(raw_table), Identifier(raw_table + "_pkey")
)
)
repository.object_engine.commit()

logging.info("Storing %s. Sync mode: %s", raw_table, sync_mode)
# Make sure the raw table's schema didn't change (very rare, since it's
# just hash, JSON, timestamp)
new_schema = engine.get_full_table_schema(staging_schema, raw_table)
if sync_mode != "overwrite":
try:
current_schema = current_image.get_table(raw_table).table_schema
if current_schema != new_schema:
if current_schema != AIRBYTE_RAW_SCHEMA:
raise AssertionError(
"Schema for %s changed! Old: %s, new: %s",
raw_table,
current_schema,
new_schema,
"Schema for %s changed! Old: %s, new: %s"
% (
raw_table,
current_schema,
AIRBYTE_RAW_SCHEMA,
)
)
except TableNotFoundError:
pass
Expand All @@ -105,7 +102,9 @@ def _store_raw_airbyte_tables(
# current raw table so that record_table_as_base doesn't append objects to the existing
# table.
if sync_mode == "overwrite":
repository.objects.overwrite_table(repository, image_hash, raw_table, new_schema, [])
repository.objects.overwrite_table(
repository, image_hash, raw_table, AIRBYTE_RAW_SCHEMA, []
)

repository.objects.record_table_as_base(
repository,
Expand All @@ -114,6 +113,7 @@ def _store_raw_airbyte_tables(
chunk_size=DEFAULT_CHUNK_SIZE,
source_schema=staging_schema,
source_table=raw_table,
table_schema=AIRBYTE_RAW_SCHEMA,
)

return raw_tables
Expand Down