diff --git a/splitgraph/core/fragment_manager.py b/splitgraph/core/fragment_manager.py index 695144eb..2cfbdd17 100644 --- a/splitgraph/core/fragment_manager.py +++ b/splitgraph/core/fragment_manager.py @@ -754,13 +754,12 @@ def create_base_fragment( if source_schema == "pg_temp" and not table_schema: raise ValueError("Cannot infer the schema of temporary tables, pass in table_schema!") - # Get schema (apart from the chunk ID column) # Fragments can't be reused in tables with different schemas # even if the contents match (e.g. '1' vs 1). Hence, include the table schema - # n the object ID as well. - table_schema = table_schema or [ - c for c in self.object_engine.get_full_table_schema(source_schema, source_table) - ] + # in the object ID as well. + table_schema = table_schema or self.object_engine.get_full_table_schema( + source_schema, source_table + ) schema_hash = self._calculate_schema_hash(table_schema) # Get content hash for this chunk. @@ -864,6 +863,7 @@ def record_table_as_base( extra_indexes: Optional[ExtraIndexInfo] = None, in_fragment_order: Optional[List[str]] = None, overwrite: bool = False, + table_schema: Optional[TableSchema] = None, ) -> List[str]: """ Copies the full table verbatim into one or more new base fragments and registers them. @@ -877,6 +877,9 @@ def record_table_as_base( :param extra_indexes: Dictionary of {index_type: column: index_specific_kwargs}. :param in_fragment_order: Key to sort data inside each chunk by. :param overwrite: Overwrite physical objects that already exist. + :param table_schema: Override the columns that will be picked from the original table + (e.g. to change their order or primary keys). Note that the schema must be a subset + of the original schema and this method doesn't verify PK constraints. """ source_schema = source_schema or repository.to_schema() source_table = source_table or table_name @@ -888,7 +891,9 @@ def record_table_as_base( return_shape=ResultShape.ONE_ONE, ) - table_schema = self.object_engine.get_full_table_schema(source_schema, source_table) + table_schema = table_schema or self.object_engine.get_full_table_schema( + source_schema, source_table + ) if chunk_size and table_not_empty: object_ids = self._chunk_table( repository, @@ -898,6 +903,7 @@ def record_table_as_base( extra_indexes, in_fragment_order=in_fragment_order, overwrite=overwrite, + table_schema=table_schema, ) elif table_not_empty: diff --git a/splitgraph/ingestion/airbyte/utils.py b/splitgraph/ingestion/airbyte/utils.py index c424f821..a71b6c76 100644 --- a/splitgraph/ingestion/airbyte/utils.py +++ b/splitgraph/ingestion/airbyte/utils.py @@ -1,7 +1,6 @@ import logging from typing import Dict, Any, Iterable, Generator, Optional, List, Tuple -from psycopg2.sql import SQL, Identifier from target_postgres.db_sync import column_type from splitgraph.config import DEFAULT_CHUNK_SIZE @@ -21,6 +20,21 @@ AirbyteConfig = Dict[str, Any] AIRBYTE_RAW = "_airbyte_raw" +# By default, the PK on the Airbyte table is _airbyte_ab_id which is a UUID. We also +# cluster by the PK, which means it will be used to detect overlaps and we'll be sorting +# by it, which is bad as currently it forces Splitgraph to materialize a table into PG +# (since it thinks parts of the table overwrite each other). Instead, as a semi-hack, +# we change the table's schema to set the PK as (_airbyte_emitted_at, _airbyte_ab_id) +# which will make it not overlap (as each new chunk will have a different +# _airbyte_emitted_at). +AIRBYTE_RAW_SCHEMA = [ + TableColumn( + ordinal=1, name="_airbyte_emitted_at", pg_type="timestamp with time zone", is_pk=True + ), + TableColumn(ordinal=2, name="_airbyte_ab_id", pg_type="character varying", is_pk=True), + TableColumn(ordinal=3, name="_airbyte_data", pg_type="jsonb", is_pk=False), +] + def _airbyte_message_reader( stream: Iterable[bytes], @@ -61,42 +75,25 @@ def _store_raw_airbyte_tables( if not sync_mode: logging.warning( "Couldn't detect the sync mode for %s, falling back to %s", + raw_table, default_sync_mode, ) sync_mode = default_sync_mode - # By default, the PK on the Airbyte table is _airbyte_ab_id which is a UUID. We also - # cluster by the PK, which means it will be used to detect overlaps and we'll be sorting - # by it, which is bad as currently it forces Splitgraph to materialize a table into PG - # (since it thinks parts of the table overwrite each other). Instead, as a semi-hack, - # we change the table's schema to set the PK as (_airbyte_emitted_at, _airbyte_ab_id) - # which will make it not overlap (as each new chunk will have a different - # _airbyte_emitted_at). - - logging.info("Changing the table's primary key...") - repository.object_engine.run_sql( - SQL( - "ALTER TABLE {0}.{1} DROP CONSTRAINT {2}; " - "ALTER TABLE {0}.{1} ADD PRIMARY KEY (_airbyte_emitted_at, _airbyte_ab_id)" - ).format( - Identifier(staging_schema), Identifier(raw_table), Identifier(raw_table + "_pkey") - ) - ) - repository.object_engine.commit() - logging.info("Storing %s. Sync mode: %s", raw_table, sync_mode) # Make sure the raw table's schema didn't change (very rare, since it's # just hash, JSON, timestamp) - new_schema = engine.get_full_table_schema(staging_schema, raw_table) if sync_mode != "overwrite": try: current_schema = current_image.get_table(raw_table).table_schema - if current_schema != new_schema: + if current_schema != AIRBYTE_RAW_SCHEMA: raise AssertionError( - "Schema for %s changed! Old: %s, new: %s", - raw_table, - current_schema, - new_schema, + "Schema for %s changed! Old: %s, new: %s" + % ( + raw_table, + current_schema, + AIRBYTE_RAW_SCHEMA, + ) ) except TableNotFoundError: pass @@ -105,7 +102,9 @@ def _store_raw_airbyte_tables( # current raw table so that record_table_as_base doesn't append objects to the existing # table. if sync_mode == "overwrite": - repository.objects.overwrite_table(repository, image_hash, raw_table, new_schema, []) + repository.objects.overwrite_table( + repository, image_hash, raw_table, AIRBYTE_RAW_SCHEMA, [] + ) repository.objects.record_table_as_base( repository, @@ -114,6 +113,7 @@ def _store_raw_airbyte_tables( chunk_size=DEFAULT_CHUNK_SIZE, source_schema=staging_schema, source_table=raw_table, + table_schema=AIRBYTE_RAW_SCHEMA, ) return raw_tables