Skip to content

Commit

Permalink
replicators: Mark partitioned tables as non-replicated
Browse files Browse the repository at this point in the history
We don't currently support PostgreSQL partitioned tables - and the *way*
we don't support them is that we will replicate both the table (at least
if it's created post snapshotting) and its partitions, but not actually
do the proper query rewrite to allow querying from the table to hit the
partitions - effectively, this means we'll always return empty result
sets for queries that hit partitioned tables. Instead, this commit
always marks partitioned tables as non-replicated, both in
snapshotting (where we were already not snapshotting them because they
have relkind `p` instead of `t`) and in DDL replication. Note that
we *are* still replicating the partitions themselves, since they're just
regular tables.

Fixes: ENG-2857
Change-Id: Ie6d6cb25b03365b17d3c25d14e5df7b3a206cd76
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/4800
Tested-by: Buildkite CI
Reviewed-by: Nick Marino <nick@readyset.io>
Reviewed-by: Ethan Donowitz <ethan@readyset.io>
  • Loading branch information
glittershark committed May 1, 2023
1 parent edb0b00 commit 2abffe1
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 47 deletions.
30 changes: 30 additions & 0 deletions replicators/src/postgres_connector/ddl_replication.rs
Expand Up @@ -145,6 +145,9 @@ pub(crate) enum DdlEventData {
columns: Vec<DdlCreateTableColumn>,
constraints: Vec<DdlCreateTableConstraint>,
},
AddNonReplicatedTable {
name: String,
},
AlterTable {
name: String,
#[serde(deserialize_with = "parse_alter_table_statement")]
Expand Down Expand Up @@ -243,6 +246,12 @@ impl DdlEvent {
Err(_) => Change::AddNonReplicatedRelation(table),
}
}
DdlEventData::AddNonReplicatedTable { name } => {
Change::AddNonReplicatedRelation(Relation {
schema: Some(self.schema.into()),
name: name.into(),
})
}
DdlEventData::AlterTable { name, statement } => {
let stmt = match statement {
Ok(stmt) => stmt,
Expand Down Expand Up @@ -590,6 +599,27 @@ mod tests {
client.teardown().await;
}

#[parallel_group(GROUP)]
#[tokio::test]
async fn create_partitioned_table() {
readyset_tracing::init_test_logging();
let client = setup("create_partitioned_table").await;

client
.simple_query("create table t1 (key int, val int) partition by range (key)")
.await
.unwrap();

let ddl = get_last_ddl(&client, "create_partitioned_table")
.await
.unwrap();

match ddl.data {
DdlEventData::AddNonReplicatedTable { name } => assert_eq!(name, "t1"),
data => panic!("Unexpected DDL event data: {data:?}"),
}
}

#[parallel_group(GROUP)]
#[tokio::test]
async fn alter_table() {
Expand Down
97 changes: 54 additions & 43 deletions replicators/src/postgres_connector/ddl_replication.sql
Expand Up @@ -63,53 +63,64 @@ BEGIN
END IF;

SELECT
json_build_object(
'schema', object.schema_name,
'data', json_build_object('CreateTable', json_build_object(
'name', (SELECT relname FROM pg_class WHERE oid = object.objid),
'columns', (
SELECT json_agg(json_build_object(
'name', attr.attname,
'column_type', pg_catalog.format_type(
attr.atttypid,
attr.atttypmod
),
'not_null', attr.attnotnull
) ORDER BY attr.attnum)
FROM pg_catalog.pg_attribute attr
WHERE attr.attrelid = object.objid
AND attr.attnum > 0
AND NOT attr.attisdropped
),
'constraints', (
SELECT coalesce(
json_agg(json_build_object('definition', def)),
'[]'
CASE cls.relkind
WHEN 'r' THEN
json_build_object(
'schema', object.schema_name,
'data', json_build_object('CreateTable', json_build_object(
'name', cls.relname,
'columns', (
SELECT json_agg(json_build_object(
'name', attr.attname,
'column_type', pg_catalog.format_type(
attr.atttypid,
attr.atttypmod
),
'not_null', attr.attnotnull
) ORDER BY attr.attnum)
FROM pg_catalog.pg_attribute attr
WHERE attr.attrelid = object.objid
AND attr.attnum > 0
AND NOT attr.attisdropped
),
'constraints', (
SELECT coalesce(
json_agg(json_build_object('definition', def)),
'[]'
)
FROM (
SELECT DISTINCT
pg_catalog.pg_get_constraintdef(con.oid, TRUE) AS def
FROM
pg_catalog.pg_class cls_index,
pg_catalog.pg_index idx
LEFT JOIN pg_catalog.pg_constraint con
ON con.conrelid = idx.indrelid
AND con.conindid = idx.indexrelid
AND con.contype IN ('f', 'p', 'u')
WHERE cls.oid = object.objid
AND cls.oid = idx.indrelid
AND idx.indexrelid = cls_index.oid
AND pg_catalog.pg_get_constraintdef(con.oid, TRUE)
IS NOT NULL
) def
)
FROM (
SELECT DISTINCT
pg_catalog.pg_get_constraintdef(con.oid, TRUE) AS def
FROM
pg_catalog.pg_class cls,
pg_catalog.pg_class cls_index,
pg_catalog.pg_index idx
LEFT JOIN pg_catalog.pg_constraint con
ON con.conrelid = idx.indrelid
AND con.conindid = idx.indexrelid
AND con.contype IN ('f', 'p', 'u')
WHERE cls.oid = object.objid
AND cls.oid = idx.indrelid
AND idx.indexrelid = cls_index.oid
AND pg_catalog.pg_get_constraintdef(con.oid, TRUE)
IS NOT NULL
) def
)
))
)
))
)
WHEN 'p' THEN
json_build_object(
'schema', object.schema_name,
'data', json_build_object('AddNonReplicatedTable', json_build_object(
'name', cls.relname
))
)
END
INTO create_message
FROM pg_event_trigger_ddl_commands() object
JOIN pg_class cls ON object.objid = cls.oid
WHERE object.object_type = 'table'
AND object.schema_name != 'pg_temp';
AND object.schema_name != 'pg_temp'
AND cls.relkind in ('r', 'p');

IF readyset.is_pre14() THEN
UPDATE readyset.ddl_replication_log SET "ddl" = create_message;
Expand Down
15 changes: 11 additions & 4 deletions replicators/src/postgres_connector/snapshot.rs
Expand Up @@ -52,6 +52,7 @@ pub struct PostgresReplicator<'a> {
enum TableKind {
RegularTable,
View,
PartitionedTable,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -675,10 +676,15 @@ impl<'a> PostgresReplicator<'a> {
let view_list = self.get_table_list(TableKind::View).await?;
let custom_types = self.get_custom_types().await?;

let (table_list, non_replicated) = table_list.into_iter().partition::<Vec<_>, _>(|tbl| {
self.table_filter
.should_be_processed(tbl.schema.as_str(), tbl.name.as_str())
});
let (table_list, mut non_replicated) =
table_list.into_iter().partition::<Vec<_>, _>(|tbl| {
self.table_filter
.should_be_processed(tbl.schema.as_str(), tbl.name.as_str())
});

// We don't support partitioned tables (only the partitions themselves) so mark those as
// non-replicated as well
non_replicated.extend(self.get_table_list(TableKind::PartitionedTable).await?);

// We don't filter the view list by schemas since a view could be in schema 1 (that may not
// be replicated), but refer to only tables in schema 2 that are all replicated. If we try
Expand Down Expand Up @@ -948,6 +954,7 @@ impl<'a> PostgresReplicator<'a> {
let kind_code = match kind {
TableKind::RegularTable => 'r',
TableKind::View => 'v',
TableKind::PartitionedTable => 'p',
} as i8;

// We filter out tables that have any generated columns (pgcatalog.pg_attribute.attgenerated
Expand Down
97 changes: 97 additions & 0 deletions replicators/tests/tests.rs
Expand Up @@ -2487,3 +2487,100 @@ async fn pgsql_delete_from_table_without_pk() {

shutdown_tx.shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
async fn pgsql_dont_replicate_partitioned_table() {
readyset_tracing::init_test_logging();
let url = pgsql_url();
let mut client = DbConnection::connect(&url).await.unwrap();

client
.query(
"DROP TABLE IF EXISTS t CASCADE;
DROP TABLE IF EXISTS t_true CASCADE;
DROP TABLE IF EXISTS t_false CASCADE;
CREATE TABLE t (key bool not null, val int) PARTITION BY LIST (key);
CREATE TABLE t_true PARTITION OF t FOR VALUES IN (true);
CREATE TABLE t_false PARTITION OF t FOR VALUES IN (false);
INSERT INTO t (key, val) VALUES
(true, 1),
(true, 2),
(false, 10),
(false, 20);",
)
.await
.unwrap();

let (mut ctx, shutdown_tx) = TestHandle::start_noria(url.to_string(), None)
.await
.unwrap();
ctx.ready_notify.as_ref().unwrap().notified().await;

ctx.noria.table("t").await.unwrap_err();
assert!(ctx
.noria
.non_replicated_relations()
.await
.unwrap()
.contains(&Relation {
schema: Some("public".into()),
name: "t".into()
}));

ctx.noria
.table(Relation {
schema: Some("public".into()),
name: "t_true".into(),
})
.await
.unwrap();
ctx.noria
.table(Relation {
schema: Some("public".into()),
name: "t_false".into(),
})
.await
.unwrap();

ctx.check_results(
"t_true",
"pgsql_dont_replicate_partitioned_table",
&[
&[DfValue::from(true), DfValue::from(1)],
&[DfValue::from(true), DfValue::from(2)],
],
)
.await
.unwrap();
ctx.check_results(
"t_false",
"pgsql_dont_replicate_partitioned_table",
&[
&[DfValue::from(false), DfValue::from(10)],
&[DfValue::from(false), DfValue::from(20)],
],
)
.await
.unwrap();

client
.query("CREATE TABLE t2 (key int, val int) PARTITION BY RANGE (key)")
.await
.unwrap();

eventually! {
ctx
.noria
.non_replicated_relations()
.await
.unwrap()
.contains(&Relation {
schema: Some("public".into()),
name: "t2".into()
})
}

shutdown_tx.shutdown().await;
}

0 comments on commit 2abffe1

Please sign in to comment.