diff --git a/docs/how-to/configure-postgres.md b/docs/how-to/configure-postgres.md index fe57cd07b..bf99db0a4 100644 --- a/docs/how-to/configure-postgres.md +++ b/docs/how-to/configure-postgres.md @@ -116,6 +116,28 @@ CREATE PUBLICATION all_tables FOR ALL TABLES; CREATE PUBLICATION inserts_only FOR TABLE users WITH (publish = 'insert'); ``` +#### Partitioned Tables + +If you want to replicate partitioned tables, you must use `publish_via_partition_root = true` when creating your publication. This option tells Postgres to treat the [partitioned table as a single table](https://www.postgresql.org/docs/current/sql-createpublication.html#SQL-CREATEPUBLICATION-PARAMS-WITH-PUBLISH-VIA-PARTITION-ROOT) from the replication perspective, rather than replicating each partition individually. All changes to any partition will be published as changes to the parent table: + +```sql +-- Create publication with partitioned table support +CREATE PUBLICATION my_publication FOR TABLE users, orders WITH (publish_via_partition_root = true); + +-- For all tables including partitioned tables +CREATE PUBLICATION all_tables FOR ALL TABLES WITH (publish_via_partition_root = true); +``` + +**Limitation:** If this option is enabled, `TRUNCATE` operations performed directly on individual partitions are not replicated. To replicate a truncate operation, you must execute it on the parent table instead: + +```sql +-- This will NOT be replicated +TRUNCATE TABLE orders_2024_q1; + +-- This WILL be replicated +TRUNCATE TABLE orders; +``` + ### Managing Publications ```sql diff --git a/etl-api/src/db/publications.rs b/etl-api/src/db/publications.rs index 38a4ab575..5c01ac859 100644 --- a/etl-api/src/db/publications.rs +++ b/etl-api/src/db/publications.rs @@ -43,6 +43,9 @@ pub async fn create_publication( } } + // Ensure partitioned tables publish via ancestor/root schema for logical replication + query.push_str(" with (publish_via_partition_root = true)"); + pool.execute(query.as_str()).await?; Ok(()) } diff --git a/etl-postgres/src/tokio/test_utils.rs b/etl-postgres/src/tokio/test_utils.rs index 6692e486b..5c6b76123 100644 --- a/etl-postgres/src/tokio/test_utils.rs +++ b/etl-postgres/src/tokio/test_utils.rs @@ -46,14 +46,16 @@ impl PgDatabase { self.server_version } - /// Creates a Postgres publication for the specified tables. + /// Creates a Postgres publication for the specified tables with an optional configuration + /// parameter. /// - /// Sets up logical replication by creating a publication that includes - /// the given tables for change data capture. - pub async fn create_publication( + /// This method is used for specific cases which should mutate the defaults when creating a + /// publication which is done only for a small subset of tests. + pub async fn create_publication_with_config( &self, publication_name: &str, table_names: &[TableName], + publish_via_partition_root: bool, ) -> Result<(), tokio_postgres::Error> { let table_names = table_names .iter() @@ -61,9 +63,10 @@ impl PgDatabase { .collect::>(); let create_publication_query = format!( - "create publication {} for table {}", + "create publication {} for table {} with (publish_via_partition_root = {})", publication_name, - table_names.join(", ") + table_names.join(", "), + publish_via_partition_root ); self.client .as_ref() @@ -74,6 +77,16 @@ impl PgDatabase { Ok(()) } + /// Creates a Postgres publication for the specified tables. + pub async fn create_publication( + &self, + publication_name: &str, + table_names: &[TableName], + ) -> Result<(), tokio_postgres::Error> { + self.create_publication_with_config(publication_name, table_names, true) + .await + } + pub async fn create_publication_for_all( &self, publication_name: &str, @@ -87,9 +100,11 @@ impl PgDatabase { // PostgreSQL 15+ supports FOR ALL TABLES IN SCHEMA syntax let create_publication_query = match schema { Some(schema_name) => format!( - "create publication {publication_name} for tables in schema {schema_name}" + "create publication {publication_name} for tables in schema {schema_name} with (publish_via_partition_root = true)" + ), + None => format!( + "create publication {publication_name} for all tables with (publish_via_partition_root = true)" ), - None => format!("create publication {publication_name} for all tables"), }; client.execute(&create_publication_query, &[]).await?; @@ -115,8 +130,9 @@ impl PgDatabase { } } None => { - let create_publication_query = - format!("create publication {publication_name} for all tables"); + let create_publication_query = format!( + "create publication {publication_name} for all tables with (publish_via_partition_root = true)" + ); client.execute(&create_publication_query, &[]).await?; } } diff --git a/etl/src/pipeline.rs b/etl/src/pipeline.rs index e021886d4..36b4b1dac 100644 --- a/etl/src/pipeline.rs +++ b/etl/src/pipeline.rs @@ -300,6 +300,41 @@ where publication_table_ids.len() ); + // Validate that the publication is configured correctly for partitioned tables. + // + // When `publish_via_partition_root = false`, logical replication messages contain + // child partition OIDs instead of parent table OIDs. Since our schema cache only + // contains parent table IDs (from `get_publication_table_ids`), relation messages + // with child OIDs would cause pipeline failures. + let publish_via_partition_root = replication_client + .get_publish_via_partition_root(&self.config.publication_name) + .await?; + + if !publish_via_partition_root { + let has_partitioned_tables = replication_client + .has_partitioned_tables(&publication_table_ids) + .await?; + + if has_partitioned_tables { + error!( + "publication '{}' has publish_via_partition_root=false but contains partitioned table(s)", + self.config.publication_name + ); + + bail!( + ErrorKind::ConfigError, + "Invalid publication configuration for partitioned tables", + format!( + "The publication '{}' contains partitioned tables but has publish_via_partition_root=false. \ + This configuration causes replication messages to use child partition OIDs, which are not \ + tracked by the pipeline and will cause failures. Please recreate the publication with \ + publish_via_partition_root=true or use: ALTER PUBLICATION {} SET (publish_via_partition_root = true);", + self.config.publication_name, self.config.publication_name + ) + ); + } + } + self.store.load_table_replication_states().await?; let table_replication_states = self.store.get_table_replication_states().await?; diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index 2746f6c47..9797679e8 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -160,6 +160,14 @@ impl PgReplicationSlotTransaction { } } +/// Result of building publication filter SQL components. +struct PublicationFilter { + /// CTEs to include in the WITH clause (empty string if no publication filtering). + ctes: String, + /// Predicate to include in the WHERE clause (empty string if no publication filtering). + predicate: String, +} + /// A client for interacting with Postgres's logical replication features. /// /// This client provides methods for creating replication slots, managing transactions, @@ -379,6 +387,59 @@ impl PgReplicationClient { Ok(false) } + /// Retrieves the `publish_via_partition_root` setting for a publication. + /// + /// Returns `true` if the publication is configured to send replication messages using + /// the parent table OID, or `false` if it sends them using child partition OIDs. + pub async fn get_publish_via_partition_root(&self, publication: &str) -> EtlResult { + let query = format!( + "select pubviaroot from pg_publication where pubname = {};", + quote_literal(publication) + ); + + for msg in self.client.simple_query(&query).await? { + if let SimpleQueryMessage::Row(row) = msg { + let pubviaroot = + Self::get_row_value::(&row, "pubviaroot", "pg_publication").await?; + return Ok(pubviaroot == "t"); + } + } + + bail!( + ErrorKind::ConfigError, + "Publication not found", + format!("Publication '{}' not found in database", publication) + ); + } + + /// Checks if any of the provided table IDs are partitioned tables. + /// + /// A partitioned table is one where `relkind = 'p'` in `pg_class`. + /// Returns `true` if at least one table is partitioned, `false` otherwise. + pub async fn has_partitioned_tables(&self, table_ids: &[TableId]) -> EtlResult { + if table_ids.is_empty() { + return Ok(false); + } + + let table_oids_list = table_ids + .iter() + .map(|id| id.0.to_string()) + .collect::>() + .join(", "); + + let query = format!( + "select 1 from pg_class where oid in ({table_oids_list}) and relkind = 'p' limit 1;" + ); + + for msg in self.client.simple_query(&query).await? { + if let SimpleQueryMessage::Row(_) = msg { + return Ok(true); + } + } + + Ok(false) + } + /// Retrieves the names of all tables included in a publication. pub async fn get_publication_table_names( &self, @@ -407,28 +468,55 @@ impl PgReplicationClient { } /// Retrieves the OIDs of all tables included in a publication. + /// + /// For partitioned tables with `publish_via_partition_root=true`, this returns only the parent + /// table OID. The query uses a recursive CTE to walk up the partition inheritance hierarchy + /// and identify root tables that have no parent themselves. pub async fn get_publication_table_ids( &self, publication_name: &str, ) -> EtlResult> { - let publication_query = format!( - "select c.oid from pg_publication_tables pt - join pg_class c on c.relname = pt.tablename - join pg_namespace n on n.oid = c.relnamespace AND n.nspname = pt.schemaname - where pt.pubname = {};", - quote_literal(publication_name) + let query = format!( + r#" + with recursive pub_tables as ( + -- Get all tables from publication (pg_publication_tables includes explicit tables, + -- ALL TABLES publications, and FOR TABLES IN SCHEMA publications) + select c.oid + from pg_publication_tables pt + join pg_class c on c.relname = pt.tablename + join pg_namespace n on n.oid = c.relnamespace and n.nspname = pt.schemaname + where pt.pubname = {pub} + ), + hierarchy(relid) as ( + -- Start with published tables + select oid from pub_tables + + union + + -- Recursively find parent tables in inheritance hierarchy + select i.inhparent + from pg_inherits i + join hierarchy h on h.relid = i.inhrelid + ) + -- Return only root tables (those without a parent) + select distinct relid as oid + from hierarchy + where not exists ( + select 1 from pg_inherits i where i.inhrelid = hierarchy.relid + ); + "#, + pub = quote_literal(publication_name) ); - let mut table_ids = vec![]; - for msg in self.client.simple_query(&publication_query).await? { + let mut roots = vec![]; + for msg in self.client.simple_query(&query).await? { if let SimpleQueryMessage::Row(row) = msg { - // For the sake of simplicity, we refer to the table oid as table id. let table_id = Self::get_row_value::(&row, "oid", "pg_class").await?; - table_ids.push(table_id); + roots.push(table_id); } } - Ok(table_ids) + Ok(roots) } /// Starts a logical replication stream from the specified publication and slot. @@ -633,6 +721,85 @@ impl PgReplicationClient { ); } + /// Builds SQL fragments for filtering columns based on publication settings. + /// + /// Returns CTEs and predicates that filter columns according to: + /// - Postgres 15+: Column-level filtering using `prattrs` + /// - Postgres 14 and earlier: Table-level filtering only + /// - No publication: No filtering (empty strings) + fn build_publication_filter_sql( + &self, + table_id: TableId, + publication_name: Option<&str>, + ) -> PublicationFilter { + let Some(publication_name) = publication_name else { + return PublicationFilter { + ctes: String::new(), + predicate: String::new(), + }; + }; + + // Postgres 15+ supports column-level filtering via prattrs + if let Some(server_version) = self.server_version + && server_version.get() >= 150000 + { + return PublicationFilter { + ctes: format!( + "pub_info as ( + select p.oid as puboid, p.puballtables, r.prattrs + from pg_publication p + left join pg_publication_rel r on r.prpubid = p.oid and r.prrelid = {table_id} + where p.pubname = {publication} + ), + pub_attrs as ( + select unnest(prattrs) as attnum + from pub_info + where prattrs is not null + ), + pub_schema as ( + select 1 as exists_in_schema_pub + from pub_info + join pg_publication_namespace pn on pn.pnpubid = pub_info.puboid + join pg_class c on c.relnamespace = pn.pnnspid + where c.oid = {table_id} + ),", + publication = quote_literal(publication_name), + ), + predicate: "and ( + (select puballtables from pub_info) = true + or (select count(*) from pub_schema) > 0 + or ( + case (select count(*) from pub_attrs) + when 0 then true + else (a.attnum in (select attnum from pub_attrs)) + end + ) + )" + .to_string(), + }; + } + + // Postgres 14 and earlier: table-level filtering only + PublicationFilter { + ctes: format!( + "pub_info as ( + select p.puballtables + from pg_publication p + where p.pubname = {publication} + ), + pub_table as ( + select 1 as exists_in_pub + from pg_publication_rel r + join pg_publication p on r.prpubid = p.oid + where p.pubname = {publication} + and r.prrelid = {table_id} + ),", + publication = quote_literal(publication_name), + ), + predicate: "and ((select puballtables from pub_info) = true or (select count(*) from pub_table) > 0)".to_string(), + } + } + /// Retrieves schema information for all columns in a table. /// /// If a publication is specified, only columns included in that publication @@ -642,71 +809,62 @@ impl PgReplicationClient { table_id: TableId, publication: Option<&str>, ) -> EtlResult> { - let (pub_cte, pub_pred) = if let Some(publication) = publication { - if let Some(server_version) = self.server_version - && server_version.get() >= 150000 - { - ( - format!( - "with pub_attrs as ( - select unnest(r.prattrs) - from pg_publication_rel r - left join pg_publication p on r.prpubid = p.oid - where p.pubname = {publication} - and r.prrelid = {table_id} - )", - publication = quote_literal(publication), - ), - "and ( - case (select count(*) from pub_attrs) - when 0 then true - else (a.attnum in (select * from pub_attrs)) - end - )", - ) - } else { - // Postgres 14 or earlier or unknown, fallback to no column-level filtering - ( - format!( - "with pub_table as ( - select 1 as exists_in_pub - from pg_publication_rel r - left join pg_publication p on r.prpubid = p.oid - where p.pubname = {publication} - and r.prrelid = {table_id} - )", - publication = quote_literal(publication), - ), - "and (select count(*) from pub_table) > 0", - ) - } - } else { - ("".into(), "") - }; + // Build publication filter CTEs and predicates based on Postgres version. + let publication_filter = self.build_publication_filter_sql(table_id, publication); let column_info_query = format!( - "{pub_cte} - select a.attname, + r#" + with {publication_ctes} + -- Find the direct parent table (for child partitions) + direct_parent as ( + select i.inhparent as parent_oid + from pg_inherits i + where i.inhrelid = {table_id} + limit 1 + ), + -- Extract primary key column names from the parent table + parent_pk_cols as ( + select array_agg(a.attname order by x.n) as pk_column_names + from pg_constraint con + join unnest(con.conkey) with ordinality as x(attnum, n) on true + join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum + join direct_parent dp on con.conrelid = dp.parent_oid + where con.contype = 'p' + group by con.conname + ) + select + a.attname, a.atttypid, a.atttypmod, a.attnotnull, - coalesce(i.indisprimary, false) as primary + case + -- Check if column has a direct primary key index + when coalesce(i.indisprimary, false) = true then true + -- Check if column name matches parent's primary key (for partitions) + when exists ( + select 1 + from parent_pk_cols pk + where a.attname = any(pk.pk_column_names) + ) then true + else false + end as primary from pg_attribute a left join pg_index i on a.attrelid = i.indrelid and a.attnum = any(i.indkey) and i.indisprimary = true where a.attnum > 0::int2 - and not a.attisdropped - and a.attgenerated = '' - and a.attrelid = {table_id} - {pub_pred} + and not a.attisdropped + and a.attgenerated = '' + and a.attrelid = {table_id} + {publication_predicate} order by a.attnum - ", + "#, + publication_ctes = publication_filter.ctes, + publication_predicate = publication_filter.predicate, ); let mut column_schemas = vec![]; - for message in self.client.simple_query(&column_info_query).await? { if let SimpleQueryMessage::Row(row) = message { let name = Self::get_row_value::(&row, "attname", "pg_attribute").await?; @@ -807,9 +965,9 @@ impl PgReplicationClient { ) } else { format!( - r#"copy {} ({}) to stdout with (format text);"#, - table_name.as_quoted_identifier(), + r#"copy (select {} from {}) to stdout with (format text);"#, column_list, + table_name.as_quoted_identifier(), ) }; diff --git a/etl/src/test_utils/event.rs b/etl/src/test_utils/event.rs index d05615f5f..b30361877 100644 --- a/etl/src/test_utils/event.rs +++ b/etl/src/test_utils/event.rs @@ -22,15 +22,20 @@ pub fn group_events_by_type_and_table_id( for event in events { let event_type = EventType::from(event); // This grouping only works on simple DML operations. - let table_id = match event { - Event::Insert(event) => Some(event.table_id), - Event::Update(event) => Some(event.table_id), - Event::Delete(event) => Some(event.table_id), - _ => None, + let table_ids = match event { + Event::Insert(event) => vec![event.table_id], + Event::Update(event) => vec![event.table_id], + Event::Delete(event) => vec![event.table_id], + Event::Truncate(event) => event + .rel_ids + .iter() + .map(|rel_id| TableId::new(*rel_id)) + .collect(), + _ => vec![], }; - if let Some(table_id) = table_id { + for table_id in table_ids { grouped - .entry((event_type, table_id)) + .entry((event_type.clone(), table_id)) .or_insert_with(Vec::new) .push(event.clone()); } diff --git a/etl/src/test_utils/test_schema.rs b/etl/src/test_utils/test_schema.rs index 8faa6449b..2e4047fc5 100644 --- a/etl/src/test_utils/test_schema.rs +++ b/etl/src/test_utils/test_schema.rs @@ -127,6 +127,77 @@ pub async fn setup_test_database_schema( } } +/// Creates a partitioned table with the given name and partitions. +/// +/// This function creates: +/// 1. A parent partitioned table with a primary key +/// 2. Several child partitions based on the provided partition specifications +/// +/// Returns the table ID of the parent table and a list of partition table IDs. +pub async fn create_partitioned_table( + database: &PgDatabase, + table_name: TableName, + partition_specs: &[(&str, &str)], // (partition_name, partition_constraint) +) -> Result<(TableId, Vec), tokio_postgres::Error> { + let create_parent_query = format!( + "create table {} ( + id bigserial, + data text NOT NULL, + partition_key integer NOT NULL, + primary key (id, partition_key) + ) partition by range (partition_key)", + table_name.as_quoted_identifier() + ); + + database.run_sql(&create_parent_query).await?; + + let parent_row = database + .client + .as_ref() + .unwrap() + .query_one( + "select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace + where n.nspname = $1 and c.relname = $2", + &[&table_name.schema, &table_name.name], + ) + .await?; + + let parent_table_id: TableId = parent_row.get(0); + let mut partition_table_ids = Vec::new(); + + for (partition_name, partition_constraint) in partition_specs { + let partition_table_name = TableName::new( + table_name.schema.clone(), + format!("{}_{}", table_name.name, partition_name), + ); + + let create_partition_query = format!( + "create table {} partition of {} for values {}", + partition_table_name.as_quoted_identifier(), + table_name.as_quoted_identifier(), + partition_constraint + ); + + database.run_sql(&create_partition_query).await?; + + let partition_row = database + .client + .as_ref() + .unwrap() + .query_one( + "select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace + where n.nspname = $1 and c.relname = $2", + &[&partition_table_name.schema, &partition_table_name.name], + ) + .await?; + + let partition_table_id: TableId = partition_row.get(0); + partition_table_ids.push(partition_table_id); + } + + Ok((parent_table_id, partition_table_ids)) +} + /// Inserts users data into the database for testing purposes. pub async fn insert_users_data( client: &mut PgDatabase, diff --git a/etl/src/types/mod.rs b/etl/src/types/mod.rs index 05103904c..867af9a23 100644 --- a/etl/src/types/mod.rs +++ b/etl/src/types/mod.rs @@ -13,7 +13,7 @@ pub use event::*; pub use pipeline::*; pub use table_row::*; -pub use crate::conversions::numeric::PgNumeric; +pub use crate::conversions::numeric::{PgNumeric, Sign}; // Re-exports. pub use etl_postgres::types::*; diff --git a/etl/tests/pipeline_with_partitioned_table.rs b/etl/tests/pipeline_with_partitioned_table.rs new file mode 100644 index 000000000..f096bff1f --- /dev/null +++ b/etl/tests/pipeline_with_partitioned_table.rs @@ -0,0 +1,1430 @@ +#![cfg(feature = "test-utils")] + +use etl::destination::memory::MemoryDestination; +use etl::error::ErrorKind; +use etl::state::table::TableReplicationPhaseType; +use etl::test_utils::database::{spawn_source_database, test_table_name}; +use etl::test_utils::event::group_events_by_type_and_table_id; +use etl::test_utils::notify::NotifyingStore; +use etl::test_utils::pipeline::create_pipeline; +use etl::test_utils::test_destination_wrapper::TestDestinationWrapper; +use etl::test_utils::test_schema::create_partitioned_table; +use etl::types::EventType; +use etl::types::PipelineId; +use etl::types::TableId; +use etl_telemetry::tracing::init_test_tracing; +use rand::random; +use tokio_postgres::types::Type; + +/// Tests that initial COPY replicates all rows from a partitioned table. +/// Only the parent table is tracked, not individual child partitions. +#[tokio::test(flavor = "multi_thread")] +async fn partitioned_table_copy_replicates_existing_data() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events"); + let partition_specs = [ + ("p1", "from (1) to (100)"), + ("p2", "from (100) to (200)"), + ("p3", "from (200) to (300)"), + ]; + + let (parent_table_id, _partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .unwrap(); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150), ('event3', 250)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + // Register notification for initial copy completion. + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + + parent_sync_done.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + // Verify table schema was discovered correctly. + let table_schemas = state_store.get_table_schemas().await; + assert!(table_schemas.contains_key(&parent_table_id)); + + let parent_schema = &table_schemas[&parent_table_id]; + assert_eq!(parent_schema.id, parent_table_id); + assert_eq!(parent_schema.name, table_name); + + // Verify columns are correctly discovered. + assert_eq!(parent_schema.column_schemas.len(), 3); + + // Check id column (added by default). + let id_column = &parent_schema.column_schemas[0]; + assert_eq!(id_column.name, "id"); + assert_eq!(id_column.typ, Type::INT8); + assert!(!id_column.nullable); + assert!(id_column.primary); + + // Check data column. + let data_column = &parent_schema.column_schemas[1]; + assert_eq!(data_column.name, "data"); + assert_eq!(data_column.typ, Type::TEXT); + assert!(!data_column.nullable); + assert!(!data_column.primary); + + // Check partition_key column. + let partition_key_column = &parent_schema.column_schemas[2]; + assert_eq!(partition_key_column.name, "partition_key"); + assert_eq!(partition_key_column.typ, Type::INT4); + assert!(!partition_key_column.nullable); + assert!(partition_key_column.primary); + + let table_rows = destination.get_table_rows().await; + let total_rows: usize = table_rows.values().map(|rows| rows.len()).sum(); + assert_eq!(total_rows, 3); + + let table_states = state_store.get_table_replication_states().await; + assert!(table_states.contains_key(&parent_table_id)); + assert_eq!(table_states.len(), 1); + + let parent_table_rows = table_rows + .iter() + .filter(|(table_id, _)| **table_id == parent_table_id) + .map(|(_, rows)| rows.len()) + .sum::(); + assert_eq!(parent_table_rows, 3); +} + +/// Tests that CDC streams inserts to partitions created after pipeline startup. +/// New partitions are automatically included without publication changes. +#[tokio::test(flavor = "multi_thread")] +async fn partitioned_table_copy_and_streams_new_data_from_new_partition() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events_late"); + let initial_partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, _initial_partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &initial_partition_specs) + .await + .unwrap(); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub_late".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + // Register notification for initial copy completion. + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + + parent_sync_done.notified().await; + + let new_partition_name = format!("{}_{}", table_name.name, "p3"); + let new_partition_qualified_name = format!("{}.{}", table_name.schema, new_partition_name); + database + .run_sql(&format!( + "create table {} partition of {} for values from (200) to (300)", + new_partition_qualified_name, + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Wait for CDC to deliver the new row. + let inserts_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event3', 250)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + inserts_notify.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + let table_rows = destination.get_table_rows().await; + let parent_table_rows = table_rows + .iter() + .filter(|(table_id, _)| **table_id == parent_table_id) + .map(|(_, rows)| rows.len()) + .sum::(); + assert_eq!(parent_table_rows, 2); + + let events = destination.get_events().await; + let grouped = group_events_by_type_and_table_id(&events); + let parent_inserts = grouped + .get(&(EventType::Insert, parent_table_id)) + .cloned() + .unwrap_or_default(); + assert_eq!(parent_inserts.len(), 1); +} + +/// Tests that detaching and dropping a partition does not emit DELETE or TRUNCATE events. +/// Partition management is a DDL operation, not DML, so no data events should be generated. +#[tokio::test(flavor = "multi_thread")] +async fn partition_drop_does_not_emit_delete_or_truncate() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events_drop"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, _partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .unwrap(); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub_drop".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + parent_sync_done.notified().await; + + let events_before = destination.get_events().await; + let grouped_before = group_events_by_type_and_table_id(&events_before); + let delete_count_before = grouped_before + .get(&(EventType::Delete, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + let truncate_count_before = grouped_before + .get(&(EventType::Truncate, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + + // Detach and drop one child partition (DDL should not generate DML events). + let partition_p1_name = format!("{}_{}", table_name.name, "p1"); + let partition_p1_qualified = format!("{}.{}", table_name.schema, partition_p1_name); + database + .run_sql(&format!( + "alter table {} detach partition {}", + table_name.as_quoted_identifier(), + partition_p1_qualified + )) + .await + .unwrap(); + database + .run_sql(&format!("drop table {partition_p1_qualified}")) + .await + .unwrap(); + + // Insert a row into an existing partition to ensure the pipeline is still processing events. + let inserts_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event3', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + inserts_notify.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + let events_after = destination.get_events().await; + let grouped_after = group_events_by_type_and_table_id(&events_after); + let delete_count_after = grouped_after + .get(&(EventType::Delete, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + let truncate_count_after = grouped_after + .get(&(EventType::Truncate, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + + assert_eq!(delete_count_after, delete_count_before); + assert_eq!(truncate_count_after, truncate_count_before); +} + +/// Tests that issuing a TRUNCATE at the parent table level does emit a TRUNCATE event in the +/// replication stream. +#[tokio::test(flavor = "multi_thread")] +async fn parent_table_truncate_does_emit_truncate_event() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events_truncate"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, _partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .unwrap(); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub_truncate".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + + parent_sync_done.notified().await; + + // Wait for the parent table truncate to be replicated. + let truncate_notify = destination + .wait_for_events_count(vec![(EventType::Truncate, 1)]) + .await; + + // We truncate the parent table. + database + .run_sql(&format!( + "truncate table {}", + table_name.as_quoted_identifier(), + )) + .await + .unwrap(); + + truncate_notify.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + let events = destination.get_events().await; + let grouped_events = group_events_by_type_and_table_id(&events); + let truncate_count = grouped_events + .get(&(EventType::Truncate, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + + assert_eq!(truncate_count, 1); +} + +/// Tests that issuing a TRUNCATE at the child table level does NOT emit a TRUNCATE event in the +/// replication stream. +#[tokio::test(flavor = "multi_thread")] +async fn child_table_truncate_does_not_emit_truncate_event() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events_truncate"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, _partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .unwrap(); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub_truncate".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + + parent_sync_done.notified().await; + + // We truncate the child table. + let partition_p1_name = format!("{}_{}", table_name.name, "p1"); + let partition_p1_qualified = format!("{}.{}", table_name.schema, partition_p1_name); + database + .run_sql(&format!("truncate table {partition_p1_qualified}")) + .await + .unwrap(); + + // Insert a row into an existing partition to ensure the pipeline is still processing events. + let inserts_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event3', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + inserts_notify.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + let events = destination.get_events().await; + let grouped_events = group_events_by_type_and_table_id(&events); + let truncate_count = grouped_events + .get(&(EventType::Truncate, parent_table_id)) + .map(|v| v.len()) + .unwrap_or(0); + + assert_eq!(truncate_count, 0); +} + +/// Tests that detached partitions are not replicated with explicit publications. +/// Once detached, the partition becomes independent and is not in the publication since +/// only the parent table was explicitly added. Inserts to detached partitions are not replicated. +#[tokio::test(flavor = "multi_thread")] +async fn partition_detach_with_explicit_publication_does_not_replicate_detached_inserts() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events_detach"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .unwrap(); + + let p1_table_id = partition_table_ids[0]; + + // Insert initial data into both partitions. + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Create explicit publication for parent table only. + let publication_name = "test_partitioned_pub_detach".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name.clone(), + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + parent_sync_done.notified().await; + + // Verify initial sync copied both rows. + let table_rows = destination.get_table_rows().await; + assert_eq!(table_rows.len(), 1); + let parent_rows: usize = table_rows + .get(&parent_table_id) + .map(|rows| rows.len()) + .unwrap_or(0); + assert_eq!(parent_rows, 2); + + // Detach partition p1 from parent. + let partition_p1_name = format!("{}_{}", table_name.name, "p1"); + let partition_p1_qualified = format!("{}.{}", table_name.schema, partition_p1_name); + database + .run_sql(&format!( + "alter table {} detach partition {}", + table_name.as_quoted_identifier(), + partition_p1_qualified + )) + .await + .unwrap(); + + // Insert into the detached partition (should NOT be replicated). + database + .run_sql(&format!( + "insert into {partition_p1_qualified} (data, partition_key) values ('detached_event', 25)" + )) + .await + .unwrap(); + + // Wait for the parent table insert to be replicated. + let inserts_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + // Insert into the parent table (should be replicated to remaining partition p2). + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('parent_event', 125)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + inserts_notify.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + // Verify events + let events = destination.get_events().await; + let grouped = group_events_by_type_and_table_id(&events); + + // Parent table should have 1 insert event (the insert after detachment). + let parent_inserts = grouped + .get(&(EventType::Insert, parent_table_id)) + .cloned() + .unwrap_or_default(); + assert_eq!(parent_inserts.len(), 1); + + // Detached partition should have NO insert events. + let detached_inserts = grouped + .get(&(EventType::Insert, p1_table_id)) + .cloned() + .unwrap_or_default(); + assert_eq!(detached_inserts.len(), 0); +} + +/// Tests catalog state when a partition is detached with FOR ALL TABLES publication. +/// The detached partition appears in pg_publication_tables but is not automatically discovered +/// by the running pipeline. Table discovery only happens at pipeline startup, not during execution. +#[tokio::test(flavor = "multi_thread")] +async fn partition_detach_with_all_tables_publication_does_not_replicate_detached_inserts() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events_all_tables"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .unwrap(); + + let p1_table_id = partition_table_ids[0]; + + // Insert initial data. + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Create FOR ALL TABLES publication. + let publication_name = "test_all_tables_pub_detach".to_string(); + database + .create_publication_for_all(&publication_name, None) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name.clone(), + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + parent_sync_done.notified().await; + + // Verify the initial state. The parent table is the only table tracked. + let table_states_before = state_store.get_table_replication_states().await; + assert!(table_states_before.contains_key(&parent_table_id)); + assert!(!table_states_before.contains_key(&p1_table_id)); + + // Detach partition p1. + let partition_p1_name = format!("{}_{}", table_name.name, "p1"); + let partition_p1_qualified = format!("{}.{}", table_name.schema, partition_p1_name); + database + .run_sql(&format!( + "alter table {} detach partition {}", + table_name.as_quoted_identifier(), + partition_p1_qualified + )) + .await + .unwrap(); + + // Verify catalog state. The detached partition is now a standalone table. + let inherits_check = database + .client + .as_ref() + .unwrap() + .query( + "select count(*) as cnt from pg_inherits where inhrelid = $1", + &[&p1_table_id.0], + ) + .await + .unwrap(); + let inherits_count: i64 = inherits_check[0].get("cnt"); + assert_eq!(inherits_count, 0); + + // Check pg_publication_tables. With FOR ALL TABLES, the detached partition should appear. + let pub_tables_check = database + .client + .as_ref() + .unwrap() + .query( + "select count(*) as cnt from pg_publication_tables + where pubname = $1 and tablename = $2", + &[&publication_name, &partition_p1_name], + ) + .await + .unwrap(); + let pub_tables_count: i64 = pub_tables_check[0].get("cnt"); + assert_eq!(pub_tables_count, 1); + + // Insert into detached partition. + database + .run_sql(&format!( + "insert into {partition_p1_qualified} (data, partition_key) values ('detached_event', 25)" + )) + .await + .unwrap(); + + // Note: The running pipeline won't automatically discover the detached partition + // without re-scanning for new tables. This is expected behavior, the table discovery + // happens at pipeline start or explicit refresh. + + let _ = pipeline.shutdown_and_wait().await; + + // The pipeline state should still only track the parent table (not the detached partition) + // because it hasn't re-scanned for new tables. + let table_states_after = state_store.get_table_replication_states().await; + assert!(table_states_after.contains_key(&parent_table_id)); + + // The detached partition insert should NOT be replicated in this pipeline run + // because the pipeline hasn't discovered it as a new table. + let events = destination.get_events().await; + let grouped = group_events_by_type_and_table_id(&events); + let detached_inserts = grouped + .get(&(EventType::Insert, p1_table_id)) + .cloned() + .unwrap_or_default(); + assert_eq!(detached_inserts.len(), 0); +} + +/// Tests that a detached partition is discovered as a new table after pipeline restart. +/// With FOR ALL TABLES publication, the detached partition is re-discovered during table +/// scanning at startup and its data is replicated. +#[tokio::test(flavor = "multi_thread")] +async fn partition_detach_with_all_tables_publication_does_replicate_detached_inserts_on_restart() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events_restart"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .unwrap(); + + let p1_table_id = partition_table_ids[0]; + + // Insert initial data. + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Create FOR ALL TABLES publication. + let publication_name = "test_all_tables_restart".to_string(); + database + .create_publication_for_all(&publication_name, None) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + // Start pipeline and wait for initial sync. + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name.clone(), + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + parent_sync_done.notified().await; + + // Verify the initial state. The parent table is the only table tracked. + let table_states_before = state_store.get_table_replication_states().await; + assert!(table_states_before.contains_key(&parent_table_id)); + assert!(!table_states_before.contains_key(&p1_table_id)); + + // Detach partition p1. + let partition_p1_name = format!("{}_{}", table_name.name, "p1"); + let partition_p1_qualified = format!("{}.{}", table_name.schema, partition_p1_name); + database + .run_sql(&format!( + "alter table {} detach partition {}", + table_name.as_quoted_identifier(), + partition_p1_qualified + )) + .await + .unwrap(); + + // Insert into detached partition (while pipeline is stopped). + database + .run_sql(&format!( + "insert into {partition_p1_qualified} (data, partition_key) values ('detached_event', 25)" + )) + .await + .unwrap(); + + // Shutdown the pipeline. + let _ = pipeline.shutdown_and_wait().await; + + // Restart the pipeline. It should now discover the detached partition as a new table. + let detached_sync_done = state_store + .notify_on_table_state_type(p1_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name.clone(), + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + + // Wait for the detached partition to be synced. + detached_sync_done.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + // Verify the detached partition was discovered and synced. + let table_states_after = state_store.get_table_replication_states().await; + assert!(table_states_after.contains_key(&p1_table_id)); + + // Verify the data from the detached partition was copied. + let table_rows = destination.get_table_rows().await; + let parent_rows: usize = table_rows + .get(&parent_table_id) + .map(|rows| rows.len()) + .unwrap_or(0); + assert_eq!(parent_rows, 2); + let detached_rows: usize = table_rows + .get(&p1_table_id) + .map(|rows| rows.len()) + .unwrap_or(0); + assert_eq!(detached_rows, 2); +} + +/// Tests that detached partitions are not automatically discovered with FOR TABLES IN SCHEMA publication. +/// Similar to FOR ALL TABLES, the detached partition appears in pg_publication_tables but is not +/// automatically discovered by the running pipeline without restart. +/// Requires PostgreSQL 15+ for FOR TABLES IN SCHEMA support. +#[tokio::test(flavor = "multi_thread")] +async fn partition_detach_with_schema_publication_does_not_replicate_detached_inserts() { + init_test_tracing(); + let database = spawn_source_database().await; + + // Skip test if PostgreSQL version is < 15 (FOR TABLES IN SCHEMA requires 15+). + if let Some(version) = database.server_version() { + if version.get() < 150000 { + eprintln!("Skipping test: PostgreSQL 15+ required for FOR TABLES IN SCHEMA"); + return; + } + } + + let table_name = test_table_name("partitioned_events_schema_detach"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .unwrap(); + + let p1_table_id = partition_table_ids[0]; + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Create FOR TABLES IN SCHEMA publication. + let publication_name = "test_schema_pub_detach".to_string(); + database + .create_publication_for_all(&publication_name, Some(&table_name.schema)) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name.clone(), + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + parent_sync_done.notified().await; + + // Verify initial state. + let table_states_before = state_store.get_table_replication_states().await; + assert!(table_states_before.contains_key(&parent_table_id)); + assert!(!table_states_before.contains_key(&p1_table_id)); + + // Detach partition p1. + let partition_p1_name = format!("{}_{}", table_name.name, "p1"); + let partition_p1_qualified = format!("{}.{}", table_name.schema, partition_p1_name); + database + .run_sql(&format!( + "alter table {} detach partition {}", + table_name.as_quoted_identifier(), + partition_p1_qualified + )) + .await + .unwrap(); + + // Verify catalog state. The detached partition should appear in pg_publication_tables. + let pub_tables_check = database + .client + .as_ref() + .unwrap() + .query( + "select count(*) as cnt from pg_publication_tables + where pubname = $1 and tablename = $2", + &[&publication_name, &partition_p1_name], + ) + .await + .unwrap(); + let pub_tables_count: i64 = pub_tables_check[0].get("cnt"); + assert_eq!(pub_tables_count, 1); + + // Insert into detached partition. + database + .run_sql(&format!( + "insert into {partition_p1_qualified} (data, partition_key) values ('detached_event', 25)" + )) + .await + .unwrap(); + + // Wait for the parent table insert to be replicated. + let inserts_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 1)]) + .await; + + // Insert into parent table (should be replicated). + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('parent_event', 125)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + inserts_notify.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + // The pipeline state should still only track the parent table. + let table_states_after = state_store.get_table_replication_states().await; + assert!(table_states_after.contains_key(&parent_table_id)); + + // Verify events. + let events = destination.get_events().await; + let grouped = group_events_by_type_and_table_id(&events); + + // Parent table should have 1 insert event. + let parent_inserts = grouped + .get(&(EventType::Insert, parent_table_id)) + .cloned() + .unwrap_or_default(); + assert_eq!(parent_inserts.len(), 1); + + // Detached partition inserts should NOT be replicated without table re-discovery. + let detached_inserts = grouped + .get(&(EventType::Insert, p1_table_id)) + .cloned() + .unwrap_or_default(); + assert_eq!(detached_inserts.len(), 0); +} + +/// Tests that a detached partition is discovered as a new table after pipeline restart +/// with FOR TABLES IN SCHEMA publication. After restart, the detached partition in the same +/// schema should be discovered and its data replicated. +/// Requires PostgreSQL 15+ for FOR TABLES IN SCHEMA support. +#[tokio::test(flavor = "multi_thread")] +async fn partition_detach_with_schema_publication_does_replicate_detached_inserts_on_restart() { + init_test_tracing(); + let database = spawn_source_database().await; + + // Skip test if PostgreSQL version is < 15 (FOR TABLES IN SCHEMA requires 15+). + if let Some(version) = database.server_version() { + if version.get() < 150000 { + eprintln!("Skipping test: PostgreSQL 15+ required for FOR TABLES IN SCHEMA"); + return; + } + } + + let table_name = test_table_name("partitioned_events_schema_restart"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (parent_table_id, partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .unwrap(); + + let p1_table_id = partition_table_ids[0]; + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Create FOR TABLES IN SCHEMA publication. + let publication_name = "test_schema_pub_restart".to_string(); + database + .create_publication_for_all(&publication_name, Some(&table_name.schema)) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + // Start pipeline and wait for initial sync. + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name.clone(), + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + parent_sync_done.notified().await; + + // Verify initial state. + let table_states_before = state_store.get_table_replication_states().await; + assert!(table_states_before.contains_key(&parent_table_id)); + assert!(!table_states_before.contains_key(&p1_table_id)); + + // Detach partition p1. + let partition_p1_name = format!("{}_{}", table_name.name, "p1"); + let partition_p1_qualified = format!("{}.{}", table_name.schema, partition_p1_name); + database + .run_sql(&format!( + "alter table {} detach partition {}", + table_name.as_quoted_identifier(), + partition_p1_qualified + )) + .await + .unwrap(); + + // Insert into detached partition (while pipeline is still running). + database + .run_sql(&format!( + "insert into {partition_p1_qualified} (data, partition_key) values ('detached_event', 25)" + )) + .await + .unwrap(); + + // Shutdown the pipeline. + let _ = pipeline.shutdown_and_wait().await; + + // Restart the pipeline. It should now discover the detached partition as a new table. + let detached_sync_done = state_store + .notify_on_table_state_type(p1_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name.clone(), + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + + // Wait for the detached partition to be synced. + detached_sync_done.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + // Verify the detached partition was discovered and synced. + let table_states_after = state_store.get_table_replication_states().await; + assert!(table_states_after.contains_key(&p1_table_id)); + + // Verify the data from the detached partition was copied. + let table_rows = destination.get_table_rows().await; + let parent_rows: usize = table_rows + .get(&parent_table_id) + .map(|rows| rows.len()) + .unwrap_or(0); + assert_eq!(parent_rows, 2); + let detached_rows: usize = table_rows + .get(&p1_table_id) + .map(|rows| rows.len()) + .unwrap_or(0); + assert_eq!(detached_rows, 2); +} + +/// Tests that nested partitions (sub-partitioned tables) work correctly. +/// Creates a two-level partition hierarchy where one partition is itself partitioned, +/// and verifies that both initial COPY and CDC streaming work correctly. +/// Only the top-level parent table should be tracked in the pipeline state. +#[tokio::test(flavor = "multi_thread")] +async fn nested_partitioned_table_copy_and_cdc() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("nested_partitioned_events"); + + // Create the parent partitioned table (Level 1). + // Primary key must include all partitioning columns used at any level. + database + .run_sql(&format!( + "create table {} ( + id bigserial, + data text NOT NULL, + partition_key integer NOT NULL, + sub_partition_key integer NOT NULL, + primary key (id, partition_key, sub_partition_key) + ) partition by range (partition_key)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Get parent table ID. + let parent_row = database + .client + .as_ref() + .unwrap() + .query_one( + "select c.oid from pg_class c join pg_namespace n on n.oid = c.relnamespace + where n.nspname = $1 and c.relname = $2", + &[&table_name.schema, &table_name.name], + ) + .await + .unwrap(); + let parent_table_id: TableId = parent_row.get(0); + + // Create first partition (simple leaf partition) (Level 2a). + let p1_name = format!("{}_{}", table_name.name, "p1"); + let p1_qualified = format!("{}.{}", table_name.schema, p1_name); + database + .run_sql(&format!( + "create table {} partition of {} for values from (1) to (100)", + p1_qualified, + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Create second partition that is itself partitioned (Level 2b). + let p2_name = format!("{}_{}", table_name.name, "p2"); + let p2_qualified = format!("{}.{}", table_name.schema, p2_name); + database + .run_sql(&format!( + "create table {} partition of {} for values from (100) to (200) partition by range (sub_partition_key)", + p2_qualified, + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + // Create sub-partitions of p2 (Level 3). + let p2_sub1_name = format!("{}_{}", p2_name, "sub1"); + let p2_sub1_qualified = format!("{}.{}", table_name.schema, p2_sub1_name); + database + .run_sql(&format!( + "create table {p2_sub1_qualified} partition of {p2_qualified} for values from (1) to (50)" + )) + .await + .unwrap(); + + let p2_sub2_name = format!("{}_{}", p2_name, "sub2"); + let p2_sub2_qualified = format!("{}.{}", table_name.schema, p2_sub2_name); + database + .run_sql(&format!( + "create table {p2_sub2_qualified} partition of {p2_qualified} for values from (50) to (100)" + )) + .await + .unwrap(); + + // Insert initial data into different partitions: + // - event_p1 goes to the simple leaf partition p1 + // - event_p2_sub1 goes to nested partition p2 -> p2_sub1 + // - event_p2_sub2 goes to nested partition p2 -> p2_sub2 + database + .run_sql(&format!( + "insert into {} (data, partition_key, sub_partition_key) values + ('event_p1', 50, 25), + ('event_p2_sub1', 150, 25), + ('event_p2_sub2', 150, 75)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_nested_partitioned_pub".to_string(); + database + .create_publication(&publication_name, std::slice::from_ref(&table_name)) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + // Register notification for initial copy completion. + let parent_sync_done = state_store + .notify_on_table_state_type(parent_table_id, TableReplicationPhaseType::SyncDone) + .await; + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name, + state_store.clone(), + destination.clone(), + ); + + pipeline.start().await.unwrap(); + + parent_sync_done.notified().await; + + // Verify table schema was discovered correctly for nested partitioned table. + let table_schemas = state_store.get_table_schemas().await; + assert!(table_schemas.contains_key(&parent_table_id)); + + let parent_schema = &table_schemas[&parent_table_id]; + assert_eq!(parent_schema.id, parent_table_id); + assert_eq!(parent_schema.name, table_name); + + // Verify columns are correctly discovered (includes sub_partition_key). + assert_eq!(parent_schema.column_schemas.len(), 4); + + // Check id column (added by default). + let id_column = &parent_schema.column_schemas[0]; + assert_eq!(id_column.name, "id"); + assert_eq!(id_column.typ, Type::INT8); + assert!(!id_column.nullable); + assert!(id_column.primary); + + // Check data column. + let data_column = &parent_schema.column_schemas[1]; + assert_eq!(data_column.name, "data"); + assert_eq!(data_column.typ, Type::TEXT); + assert!(!data_column.nullable); + assert!(!data_column.primary); + + // Check partition_key column (part of primary key). + let partition_key_column = &parent_schema.column_schemas[2]; + assert_eq!(partition_key_column.name, "partition_key"); + assert_eq!(partition_key_column.typ, Type::INT4); + assert!(!partition_key_column.nullable); + assert!(partition_key_column.primary); + + // Check sub_partition_key column (part of primary key for nested partitioning). + let sub_partition_key_column = &parent_schema.column_schemas[3]; + assert_eq!(sub_partition_key_column.name, "sub_partition_key"); + assert_eq!(sub_partition_key_column.typ, Type::INT4); + assert!(!sub_partition_key_column.nullable); + assert!(sub_partition_key_column.primary); + + // Verify initial COPY replicated all 3 rows. + let table_rows = destination.get_table_rows().await; + let total_rows: usize = table_rows.values().map(|rows| rows.len()).sum(); + assert_eq!(total_rows, 3); + + // Verify only the parent table is tracked (not intermediate or leaf partitions). + let table_states = state_store.get_table_replication_states().await; + assert!(table_states.contains_key(&parent_table_id)); + assert_eq!(table_states.len(), 1); + + // Verify all rows are attributed to the parent table. + let parent_table_rows = table_rows + .iter() + .filter(|(table_id, _)| **table_id == parent_table_id) + .map(|(_, rows)| rows.len()) + .sum::(); + assert_eq!(parent_table_rows, 3); + + // Insert new rows into different nested partitions. + let inserts_notify = destination + .wait_for_events_count(vec![(EventType::Insert, 3)]) + .await; + + database + .run_sql(&format!( + "insert into {} (data, partition_key, sub_partition_key) values + ('new_event_p1', 75, 30), + ('new_event_p2_sub1', 125, 40), + ('new_event_p2_sub2', 175, 60)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + inserts_notify.notified().await; + + let _ = pipeline.shutdown_and_wait().await; + + // Verify that events were captured for all nested partitions. + let events = destination.get_events().await; + let grouped = group_events_by_type_and_table_id(&events); + let parent_inserts = grouped + .get(&(EventType::Insert, parent_table_id)) + .cloned() + .unwrap_or_default(); + assert_eq!(parent_inserts.len(), 3); +} + +/// Tests that the pipeline throws an error during startup when `publish_via_partition_root` +/// is set to `false` and the publication contains partitioned tables. +/// +/// When `publish_via_partition_root = false`, logical replication messages contain child +/// partition OIDs instead of parent table OIDs. Since the pipeline's schema cache only +/// tracks parent table IDs, this configuration would cause pipeline failures when relation +/// messages arrive with unknown child OIDs. +/// +/// The pipeline validates this configuration at startup and rejects it with a clear error +/// message instructing the user to enable `publish_via_partition_root`. +#[tokio::test(flavor = "multi_thread")] +async fn partitioned_table_with_publish_via_partition_root_false_and_partitioned_tables() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("partitioned_events"); + let partition_specs = [("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")]; + + let (_parent_table_id, _partition_table_ids) = + create_partitioned_table(&database, table_name.clone(), &partition_specs) + .await + .unwrap(); + + database + .run_sql(&format!( + "insert into {} (data, partition_key) values ('event1', 50), ('event2', 150)", + table_name.as_quoted_identifier() + )) + .await + .unwrap(); + + let publication_name = "test_partitioned_pub".to_string(); + database + .create_publication_with_config(&publication_name, std::slice::from_ref(&table_name), false) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name.clone(), + state_store.clone(), + destination.clone(), + ); + + // The pipeline should fail to start due to invalid configuration. + let err = pipeline.start().await.err().unwrap(); + assert_eq!(err.kind(), ErrorKind::ConfigError); +} + +/// Tests that the pipeline doesn't throw an error when `publish_via_partition_root=false` and there +/// are no partitioned tables in the tables of the publication. +#[tokio::test(flavor = "multi_thread")] +async fn partitioned_table_with_publish_via_partition_root_false_and_no_partitioned_tables() { + init_test_tracing(); + let database = spawn_source_database().await; + + let table_name = test_table_name("non_partitioned_events"); + database + .create_table( + table_name.clone(), + true, + &[("description", "text not null")], + ) + .await + .unwrap(); + + let publication_name = "test_non_partitioned_pub".to_string(); + database + .create_publication_with_config(&publication_name, std::slice::from_ref(&table_name), false) + .await + .unwrap(); + + let state_store = NotifyingStore::new(); + let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); + + let pipeline_id: PipelineId = random(); + let mut pipeline = create_pipeline( + &database.config, + pipeline_id, + publication_name.clone(), + state_store.clone(), + destination.clone(), + ); + + // The pipeline should start and stop successfully. + pipeline.start().await.unwrap(); + let result = pipeline.shutdown_and_wait().await; + assert!(result.is_ok()); +} diff --git a/etl/tests/replication.rs b/etl/tests/replication.rs index a1d9dda1e..21e7ee1c7 100644 --- a/etl/tests/replication.rs +++ b/etl/tests/replication.rs @@ -1,10 +1,13 @@ #![cfg(feature = "test-utils")] +use std::collections::HashSet; + use etl::error::ErrorKind; use etl::replication::client::PgReplicationClient; use etl::test_utils::database::{spawn_source_database, test_table_name}; use etl::test_utils::pipeline::test_slot_name; use etl::test_utils::table::assert_table_schema; +use etl::test_utils::test_schema::create_partitioned_table; use etl_postgres::tokio::test_utils::{TableModification, id_column_schema}; use etl_postgres::types::ColumnSchema; use etl_telemetry::tracing::init_test_tracing; @@ -550,11 +553,47 @@ async fn test_publication_creation_and_check() { ); // We check the table ids of the tables in the publication. - let table_ids = parent_client + let table_ids: HashSet<_> = parent_client .get_publication_table_ids("my_publication") + .await + .unwrap() + .into_iter() + .collect(); + assert_eq!(table_ids, HashSet::from([table_1_id, table_2_id])); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_publication_table_ids_collapse_partitioned_root() { + init_test_tracing(); + let database = spawn_source_database().await; + + let client = PgReplicationClient::connect(database.config.clone()) .await .unwrap(); - assert_eq!(table_ids, vec![table_1_id, table_2_id]); + + // We create a partitioned parent with two child partitions. + let table_name = test_table_name("part_parent"); + let (parent_table_id, _children) = create_partitioned_table( + &database, + table_name.clone(), + &[("p1", "from (1) to (100)"), ("p2", "from (100) to (200)")], + ) + .await + .unwrap(); + + let publication_name = "pub_part_root"; + database + .create_publication(publication_name, std::slice::from_ref(&table_name)) + .await + .unwrap(); + + let id = client + .get_publication_table_ids(publication_name) + .await + .unwrap(); + + // We expect to get only the parent table id. + assert_eq!(id, vec![parent_table_id]); } #[tokio::test(flavor = "multi_thread")]