From ec98786e4fec0afdfb125ce42b078286e254b307 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Tue, 3 Aug 2021 21:56:26 +0100 Subject: [PATCH 1/3] Allow overriding the table schema when building an object (e.g. to select a subset of columns, claim a different PK or reorder columns). This is slightly hacky, but the previous hack didn't work as it still treated the PK as (UUID, ingestion timestamp), so, since PKs compare with UUID first, it would still overlap with other PKs from earlier partitions. --- splitgraph/core/fragment_manager.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/splitgraph/core/fragment_manager.py b/splitgraph/core/fragment_manager.py index 695144eb..2cfbdd17 100644 --- a/splitgraph/core/fragment_manager.py +++ b/splitgraph/core/fragment_manager.py @@ -754,13 +754,12 @@ def create_base_fragment( if source_schema == "pg_temp" and not table_schema: raise ValueError("Cannot infer the schema of temporary tables, pass in table_schema!") - # Get schema (apart from the chunk ID column) # Fragments can't be reused in tables with different schemas # even if the contents match (e.g. '1' vs 1). Hence, include the table schema - # n the object ID as well. - table_schema = table_schema or [ - c for c in self.object_engine.get_full_table_schema(source_schema, source_table) - ] + # in the object ID as well. + table_schema = table_schema or self.object_engine.get_full_table_schema( + source_schema, source_table + ) schema_hash = self._calculate_schema_hash(table_schema) # Get content hash for this chunk. @@ -864,6 +863,7 @@ def record_table_as_base( extra_indexes: Optional[ExtraIndexInfo] = None, in_fragment_order: Optional[List[str]] = None, overwrite: bool = False, + table_schema: Optional[TableSchema] = None, ) -> List[str]: """ Copies the full table verbatim into one or more new base fragments and registers them. @@ -877,6 +877,9 @@ def record_table_as_base( :param extra_indexes: Dictionary of {index_type: column: index_specific_kwargs}. :param in_fragment_order: Key to sort data inside each chunk by. :param overwrite: Overwrite physical objects that already exist. + :param table_schema: Override the columns that will be picked from the original table + (e.g. to change their order or primary keys). Note that the schema must be a subset + of the original schema and this method doesn't verify PK constraints. """ source_schema = source_schema or repository.to_schema() source_table = source_table or table_name @@ -888,7 +891,9 @@ def record_table_as_base( return_shape=ResultShape.ONE_ONE, ) - table_schema = self.object_engine.get_full_table_schema(source_schema, source_table) + table_schema = table_schema or self.object_engine.get_full_table_schema( + source_schema, source_table + ) if chunk_size and table_not_empty: object_ids = self._chunk_table( repository, @@ -898,6 +903,7 @@ def record_table_as_base( extra_indexes, in_fragment_order=in_fragment_order, overwrite=overwrite, + table_schema=table_schema, ) elif table_not_empty: From d32b47d8abe4bdfc6ddec845b2e18a0e9593e6b2 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Tue, 3 Aug 2021 21:57:12 +0100 Subject: [PATCH 2/3] When storing Airbyte tables, enforce a schema which sets _emitted_at, _id as the PK to fix partitioning. --- splitgraph/ingestion/airbyte/utils.py | 45 +++++++++++++-------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/splitgraph/ingestion/airbyte/utils.py b/splitgraph/ingestion/airbyte/utils.py index c424f821..1fdc4729 100644 --- a/splitgraph/ingestion/airbyte/utils.py +++ b/splitgraph/ingestion/airbyte/utils.py @@ -1,7 +1,6 @@ import logging from typing import Dict, Any, Iterable, Generator, Optional, List, Tuple -from psycopg2.sql import SQL, Identifier from target_postgres.db_sync import column_type from splitgraph.config import DEFAULT_CHUNK_SIZE @@ -21,6 +20,21 @@ AirbyteConfig = Dict[str, Any] AIRBYTE_RAW = "_airbyte_raw" +# By default, the PK on the Airbyte table is _airbyte_ab_id which is a UUID. We also +# cluster by the PK, which means it will be used to detect overlaps and we'll be sorting +# by it, which is bad as currently it forces Splitgraph to materialize a table into PG +# (since it thinks parts of the table overwrite each other). Instead, as a semi-hack, +# we change the table's schema to set the PK as (_airbyte_emitted_at, _airbyte_ab_id) +# which will make it not overlap (as each new chunk will have a different +# _airbyte_emitted_at). +AIRBYTE_RAW_SCHEMA = [ + TableColumn( + ordinal=1, name="_airbyte_emitted_at", pg_type="timestamp with time zone", is_pk=True + ), + TableColumn(ordinal=2, name="_airbyte_ab_id", pg_type="character varying", is_pk=True), + TableColumn(ordinal=3, name="_airbyte_data", pg_type="jsonb", is_pk=False), +] + def _airbyte_message_reader( stream: Iterable[bytes], @@ -65,38 +79,18 @@ def _store_raw_airbyte_tables( ) sync_mode = default_sync_mode - # By default, the PK on the Airbyte table is _airbyte_ab_id which is a UUID. We also - # cluster by the PK, which means it will be used to detect overlaps and we'll be sorting - # by it, which is bad as currently it forces Splitgraph to materialize a table into PG - # (since it thinks parts of the table overwrite each other). Instead, as a semi-hack, - # we change the table's schema to set the PK as (_airbyte_emitted_at, _airbyte_ab_id) - # which will make it not overlap (as each new chunk will have a different - # _airbyte_emitted_at). - - logging.info("Changing the table's primary key...") - repository.object_engine.run_sql( - SQL( - "ALTER TABLE {0}.{1} DROP CONSTRAINT {2}; " - "ALTER TABLE {0}.{1} ADD PRIMARY KEY (_airbyte_emitted_at, _airbyte_ab_id)" - ).format( - Identifier(staging_schema), Identifier(raw_table), Identifier(raw_table + "_pkey") - ) - ) - repository.object_engine.commit() - logging.info("Storing %s. Sync mode: %s", raw_table, sync_mode) # Make sure the raw table's schema didn't change (very rare, since it's # just hash, JSON, timestamp) - new_schema = engine.get_full_table_schema(staging_schema, raw_table) if sync_mode != "overwrite": try: current_schema = current_image.get_table(raw_table).table_schema - if current_schema != new_schema: + if current_schema != AIRBYTE_RAW_SCHEMA: raise AssertionError( "Schema for %s changed! Old: %s, new: %s", raw_table, current_schema, - new_schema, + AIRBYTE_RAW_SCHEMA, ) except TableNotFoundError: pass @@ -105,7 +99,9 @@ def _store_raw_airbyte_tables( # current raw table so that record_table_as_base doesn't append objects to the existing # table. if sync_mode == "overwrite": - repository.objects.overwrite_table(repository, image_hash, raw_table, new_schema, []) + repository.objects.overwrite_table( + repository, image_hash, raw_table, AIRBYTE_RAW_SCHEMA, [] + ) repository.objects.record_table_as_base( repository, @@ -114,6 +110,7 @@ def _store_raw_airbyte_tables( chunk_size=DEFAULT_CHUNK_SIZE, source_schema=staging_schema, source_table=raw_table, + table_schema=AIRBYTE_RAW_SCHEMA, ) return raw_tables From a277becc2d85b3e96f47ed5607b9d9589305ed40 Mon Sep 17 00:00:00 2001 From: Artjoms Iskovs Date: Tue, 3 Aug 2021 22:08:19 +0100 Subject: [PATCH 3/3] Fix some messages. --- splitgraph/ingestion/airbyte/utils.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/splitgraph/ingestion/airbyte/utils.py b/splitgraph/ingestion/airbyte/utils.py index 1fdc4729..a71b6c76 100644 --- a/splitgraph/ingestion/airbyte/utils.py +++ b/splitgraph/ingestion/airbyte/utils.py @@ -75,6 +75,7 @@ def _store_raw_airbyte_tables( if not sync_mode: logging.warning( "Couldn't detect the sync mode for %s, falling back to %s", + raw_table, default_sync_mode, ) sync_mode = default_sync_mode @@ -87,10 +88,12 @@ def _store_raw_airbyte_tables( current_schema = current_image.get_table(raw_table).table_schema if current_schema != AIRBYTE_RAW_SCHEMA: raise AssertionError( - "Schema for %s changed! Old: %s, new: %s", - raw_table, - current_schema, - AIRBYTE_RAW_SCHEMA, + "Schema for %s changed! Old: %s, new: %s" + % ( + raw_table, + current_schema, + AIRBYTE_RAW_SCHEMA, + ) ) except TableNotFoundError: pass