Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/how-to/configure-postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,28 @@ CREATE PUBLICATION all_tables FOR ALL TABLES;
CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert');
```

#### Partitioned Tables

If you want to replicate partitioned tables, you must use `publish_via_partition_root = true` when creating your publication. This option tells Postgres to treat the [partitioned table as a single table](https://www.postgresql.org/docs/current/sql-createpublication.html#SQL-CREATEPUBLICATION-PARAMS-WITH-PUBLISH-VIA-PARTITION-ROOT) from the replication perspective, rather than replicating each partition individually. All changes to any partition will be published as changes to the parent table:

```sql
-- Create publication with partitioned table support
CREATE PUBLICATION my_publication FOR TABLE users, orders WITH (publish_via_partition_root = true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: user lowercase SQL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we want to have lowercase in user facing docs? Most docs have the uppercase convention and I felt like it would be better to follow their lead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our existing docs have mostly lowercase SQL. You can check it out in the docs folder.


-- For all tables including partitioned tables
CREATE PUBLICATION all_tables FOR ALL TABLES WITH (publish_via_partition_root = true);
```

**Limitation:** If this option is enabled, `TRUNCATE` operations performed directly on individual partitions are not replicated. To replicate a truncate operation, you must execute it on the parent table instead:

```sql
-- This will NOT be replicated
TRUNCATE TABLE orders_2024_q1;

-- This WILL be replicated
TRUNCATE TABLE orders;
```

### Managing Publications

```sql
Expand Down
3 changes: 3 additions & 0 deletions etl-api/src/db/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub async fn create_publication(
}
}

// Ensure partitioned tables publish via ancestor/root schema for logical replication
query.push_str(" with (publish_via_partition_root = true)");

pool.execute(query.as_str()).await?;
Ok(())
}
Expand Down
36 changes: 26 additions & 10 deletions etl-postgres/src/tokio/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,27 @@ impl<G: GenericClient> PgDatabase<G> {
self.server_version
}

/// Creates a Postgres publication for the specified tables.
/// Creates a Postgres publication for the specified tables with an optional configuration
/// parameter.
///
/// Sets up logical replication by creating a publication that includes
/// the given tables for change data capture.
pub async fn create_publication(
/// This method is used for specific cases which should mutate the defaults when creating a
/// publication which is done only for a small subset of tests.
pub async fn create_publication_with_config(
&self,
publication_name: &str,
table_names: &[TableName],
publish_via_partition_root: bool,
) -> Result<(), tokio_postgres::Error> {
let table_names = table_names
.iter()
.map(TableName::as_quoted_identifier)
.collect::<Vec<_>>();

let create_publication_query = format!(
"create publication {} for table {}",
"create publication {} for table {} with (publish_via_partition_root = {})",
publication_name,
table_names.join(", ")
table_names.join(", "),
publish_via_partition_root
);
self.client
.as_ref()
Expand All @@ -74,6 +77,16 @@ impl<G: GenericClient> PgDatabase<G> {
Ok(())
}

/// Creates a Postgres publication for the specified tables.
pub async fn create_publication(
&self,
publication_name: &str,
table_names: &[TableName],
) -> Result<(), tokio_postgres::Error> {
self.create_publication_with_config(publication_name, table_names, true)
.await
}

pub async fn create_publication_for_all(
&self,
publication_name: &str,
Expand All @@ -87,9 +100,11 @@ impl<G: GenericClient> PgDatabase<G> {
// PostgreSQL 15+ supports FOR ALL TABLES IN SCHEMA syntax
let create_publication_query = match schema {
Some(schema_name) => format!(
"create publication {publication_name} for tables in schema {schema_name}"
"create publication {publication_name} for tables in schema {schema_name} with (publish_via_partition_root = true)"
),
None => format!(
"create publication {publication_name} for all tables with (publish_via_partition_root = true)"
),
None => format!("create publication {publication_name} for all tables"),
};

client.execute(&create_publication_query, &[]).await?;
Expand All @@ -115,8 +130,9 @@ impl<G: GenericClient> PgDatabase<G> {
}
}
None => {
let create_publication_query =
format!("create publication {publication_name} for all tables");
let create_publication_query = format!(
"create publication {publication_name} for all tables with (publish_via_partition_root = true)"
);
client.execute(&create_publication_query, &[]).await?;
}
}
Expand Down
35 changes: 35 additions & 0 deletions etl/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,41 @@ where
publication_table_ids.len()
);

// Validate that the publication is configured correctly for partitioned tables.
//
// When `publish_via_partition_root = false`, logical replication messages contain
// child partition OIDs instead of parent table OIDs. Since our schema cache only
// contains parent table IDs (from `get_publication_table_ids`), relation messages
// with child OIDs would cause pipeline failures.
let publish_via_partition_root = replication_client
.get_publish_via_partition_root(&self.config.publication_name)
.await?;

if !publish_via_partition_root {
let has_partitioned_tables = replication_client
.has_partitioned_tables(&publication_table_ids)
.await?;

if has_partitioned_tables {
error!(
"publication '{}' has publish_via_partition_root=false but contains partitioned table(s)",
self.config.publication_name
);

bail!(
ErrorKind::ConfigError,
"Invalid publication configuration for partitioned tables",
format!(
"The publication '{}' contains partitioned tables but has publish_via_partition_root=false. \
This configuration causes replication messages to use child partition OIDs, which are not \
tracked by the pipeline and will cause failures. Please recreate the publication with \
publish_via_partition_root=true or use: ALTER PUBLICATION {} SET (publish_via_partition_root = true);",
self.config.publication_name, self.config.publication_name
)
);
}
}

self.store.load_table_replication_states().await?;
let table_replication_states = self.store.get_table_replication_states().await?;

Expand Down
Loading