-
-
Couldn't load subscription status.
- Fork 67
feat(core): Add partitioned table support #410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(core): Add partitioned table support #410
Conversation
Pull Request Test Coverage Report for Build 18839109103Details
💛 - Coveralls |
f5e3b11 to
12416f4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions:
- When
publish_via_partition_rootis false, do we copy the root table as well the the partitions? - If
publish_via_partition_rootis true, and a new partition is added, do we replicate it successfully? We don't have tests for this case.
|
|
||
| ```sql | ||
| -- Create publication with partitioned table support | ||
| CREATE PUBLICATION my_publication FOR TABLE users, orders WITH (publish_via_partition_root = true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: user lowercase SQL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we want to have lowercase in user facing docs? Most docs have the uppercase convention and I felt like it would be better to follow their lead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our existing docs have mostly lowercase SQL. You can check it out in the docs folder.
| .await | ||
| .unwrap(); | ||
|
|
||
| let _ = pipeline.shutdown_and_wait().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To test for an absence of events, we should insert one row into an existing partition after detaching & dropping the partition table and wait for that event to arrive. In the current form, the test could be passing because we shutdown the pipeline too quickly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's a fair comment. I will see what I can do.
| .await | ||
| .unwrap(); | ||
|
|
||
| let _ = pipeline.shutdown_and_wait().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
|
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| /// Retrieves the OIDs of all tables included in a publication. | ||
| /// | ||
| /// For partitioned tables with `publish_via_partition_root=true`, this returns only the parent | ||
| /// table OID. The query uses a recursive CTE to walk up the partition inheritance hierarchy | ||
| /// and identify root tables that have no parent themselves. | ||
| pub async fn get_publication_table_ids( | ||
| &self, | ||
| publication_name: &str, | ||
| ) -> EtlResult<Vec<TableId>> { | ||
| let publication_query = format!( | ||
| "select c.oid from pg_publication_tables pt | ||
| join pg_class c on c.relname = pt.tablename | ||
| join pg_namespace n on n.oid = c.relnamespace AND n.nspname = pt.schemaname | ||
| where pt.pubname = {};", | ||
| quote_literal(publication_name) | ||
| let query = format!( | ||
| r#" | ||
| with recursive pub_tables as ( | ||
| -- Get all tables from publication (pg_publication_tables includes explicit tables, | ||
| -- ALL TABLES publications, and FOR TABLES IN SCHEMA publications) | ||
| select c.oid | ||
| from pg_publication_tables pt | ||
| join pg_class c on c.relname = pt.tablename | ||
| join pg_namespace n on n.oid = c.relnamespace and n.nspname = pt.schemaname | ||
| where pt.pubname = {pub} | ||
| ), | ||
| hierarchy(relid) as ( | ||
| -- Start with published tables | ||
| select oid from pub_tables | ||
| union | ||
| -- Recursively find parent tables in inheritance hierarchy | ||
| select i.inhparent | ||
| from pg_inherits i | ||
| join hierarchy h on h.relid = i.inhrelid | ||
| ) | ||
| -- Return only root tables (those without a parent) | ||
| select distinct relid as oid | ||
| from hierarchy | ||
| where not exists ( | ||
| select 1 from pg_inherits i where i.inhrelid = hierarchy.relid | ||
| ); | ||
| "#, | ||
| pub = quote_literal(publication_name) | ||
| ); | ||
|
|
||
| let mut table_ids = vec![]; | ||
| for msg in self.client.simple_query(&publication_query).await? { | ||
| let mut roots = vec![]; | ||
| for msg in self.client.simple_query(&query).await? { | ||
| if let SimpleQueryMessage::Row(row) = msg { | ||
| // For the sake of simplicity, we refer to the table oid as table id. | ||
| let table_id = Self::get_row_value::<TableId>(&row, "oid", "pg_class").await?; | ||
| table_ids.push(table_id); | ||
| roots.push(table_id); | ||
| } | ||
| } | ||
|
|
||
| Ok(table_ids) | ||
| Ok(roots) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collapse child partitions even when publication publishes child OIDs
The new get_publication_table_ids query now always walks up pg_inherits and returns only root tables. When the publication was created with publish_via_partition_root = false (the PostgreSQL default), logical replication messages still contain the child partition OIDs. Because only the parent ID is returned here, the schema cache never contains entries for those child OIDs and handle_relation_message will raise MissingTableSchema as soon as a child relation message arrives (replication/apply.rs handle_relation_message). This turns what used to be a “no CDC after copy” scenario into a hard pipeline failure. Either avoid collapsing when pubviaroot is false or teach the apply loop to handle child OIDs gracefully.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated the code to now validate whether a publication has partitioned tables and if publish_via_partition_root=false the pipeline won't start.
Okay, does this mean only initial table copy will be done and no CDC on the root?
Cool, that's great. |
Exactly, that's the result. Since it's a weird behavior, I have added an additional check on startup that fails the pipeline when the setting is on |
Alright, that should work. We can refine this behaviour in future. |
This PR adds partitioned table support by treating a partitioned table as a single entity to replicate.
Implemented Behavior
Partition detachment handling:
publish_via_partition_roothandling:publish_via_partition_root=falsethe system will throw an error if there is at least one partitioned table in the publication.publish_via_partition_root=truethe system behaves as expected, treating the partitioned tables as one big table.Testing
Several tests have been added to verify the behavior functions correctly.
Requirements
Note:
FOR TABLES IN [schema]is only supported from Postgres 15+