Skip to content

Conversation

@iambriccardo
Copy link
Contributor

@iambriccardo iambriccardo commented Nov 27, 2025

Summary

This PR introduces replication masks, a new mechanism for handling table schemas in ETL that decouples column-level filtering from schema loading.

Motivation

The key insight is that we can load the entire table schema independently of column-level filtering in replication, then rely on Relation messages to determine which columns to actually replicate.

Changes

Replication Masks

A replication mask is a bitmask that determines which columns of a TableSchema are actively replicated at any given time. Creating a mask requires:

  • A set of active column names (from the Relation message)
  • The latest TableSchema of the table (we are assuming that the last table schema stored is synced with the incoming Relation message, thus matching by column name is sufficient)

These are combined in ReplicatedTableSchema, a wrapper type that exposes only active replicated columns on top of a stable TableSchema. This allows columns to be added or removed from a publication without breaking the pipeline (assuming the destination supports missing column data, BigQuery and Iceberg will currently fail).

Destination Schema Handling

Previously, schemas were loaded by passing the SchemaStore to the destination. This caused semantic issues, for example, truncate_table relied on assumptions about whether the schema was present or not.

The new design supplies a ReplicatedTableSchema with each event, eliminating schema loading in the destination and enforcing invariants at compile time via the type system. This also enables future support for multiple schema versions within a single batch of events, which will be critical for schema change support.

Consistent Schema Loading

To ensure schema consistency between initial table copy and DDL event triggers, we now define a Postgres function describe_table_schema that returns schema data in a consistent structure. Schema change messages are emitted in the replication stream within the same transaction that modifies the schema.

More Schema Information

With the new shared schema query, we also load ordinal positions of primary keys, that enables us to create composite primary keys in downstream destinations.

DDL Event Trigger

We also have a new DDL event trigger which will be used to dispatch schema change events (ALTER TABLE statements) in a transactionally consistent way. This is doable since Postgres runs event triggers within the transaction that triggered them and they are blocking, so when an ALTER TABLE is executed, the SQL function is executed, producing the logical replication message in same transaction as the transaction modifying the table. No statements are ALTER TABLE are run until the event trigger is executed successfully.

This will be the foundational element needed for supporting schema changes.

Future Work

Follow-up PRs will leverage the DDL message for full schema change support. For now, it's included here to validate consistency.

@iambriccardo iambriccardo changed the title Improve feat(experimental): Add DDL trigger for data changes Nov 27, 2025
pg_escape = { version = "0.1.1", default-features = false }
pin-project-lite = { version = "0.2.16", default-features = false }
postgres-replication = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }
postgres-replication = { git = "https://github.com/iambriccardo/rust-postgres", default-features = false, rev = "31acf55c7e5c2244e5bb3a36e7afa2a01bf52c38" }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used my fork which supports Message logical replication messages.

@coveralls
Copy link

coveralls commented Dec 1, 2025

Pull Request Test Coverage Report for Build 20065122958

Details

  • 996 of 1158 (86.01%) changed or added relevant lines in 31 files are covered.
  • 28 unchanged lines in 7 files lost coverage.
  • Overall coverage increased (+0.1%) to 82.082%

Changes Missing Coverage Covered Lines Changed/Added Lines %
etl-destinations/src/iceberg/core.rs 101 102 99.02%
etl-destinations/src/bigquery/client.rs 111 113 98.23%
etl-replicator/src/core.rs 0 3 0.0%
etl/src/test_utils/materialize.rs 0 3 0.0%
etl/src/types/event.rs 1 4 25.0%
etl/src/destination/memory.rs 8 12 66.67%
etl/src/pipeline.rs 5 9 55.56%
etl/src/store/both/memory.rs 0 4 0.0%
etl/src/test_utils/test_destination_wrapper.rs 10 14 71.43%
etl/src/replication/client.rs 81 86 94.19%
Files with Coverage Reduction New Missed Lines %
etl-replicator/src/core.rs 1 0.0%
etl/src/test_utils/materialize.rs 1 57.14%
etl/src/test_utils/test_schema.rs 1 90.88%
etl/src/types/event.rs 1 40.0%
etl/src/replication/apply.rs 3 84.87%
etl/src/state/table.rs 4 61.11%
etl/src/workers/table_sync.rs 17 80.62%
Totals Coverage Status
Change from base Build 19957625863: 0.1%
Covered Lines: 16826
Relevant Lines: 20499

💛 - Coveralls

// Run replicator migrations to create the state store tables.
sqlx::migrate!("../etl-replicator/migrations")
// Run migrations to create the etl tables.
sqlx::migrate!("../etl/migrations")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to move migrations into etl itself since now they are required for ETL to work, independently of which store implementation it's used.

/// The 1-based ordinal position of the column in the table.
pub ordinal_position: i32,
/// The 1-based ordinal position of this column in the primary key, or None if not a primary key.
pub primary_key_ordinal_position: Option<i32>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to properly create a composite primary key definition on the destination.

@iambriccardo iambriccardo changed the title feat(experimental): Add DDL trigger for data changes feat(experimental): Rework schema handling Dec 2, 2025
@iambriccardo iambriccardo changed the title feat(experimental): Rework schema handling feat(experimental): Rework schema handling with replication masks Dec 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants