From 0f6613028a498ceb1c613c0d9bede27aeef19a7a Mon Sep 17 00:00:00 2001 From: Marcelo Altmann Date: Tue, 22 Aug 2023 16:30:35 -0300 Subject: [PATCH] replicatiors: Implemented --replication-tables-ignore Sometimes we have an extensive number of tables but only a few of them we want to avoid replication. Currently, we can only do this via --replication-tables, However, if we want to filter out just one table, we need to list all the ones we want, instead of the single one that we do not want. Added an option to blacklist the tables that are going to be replicated. This option is mutually exclusive with --replication-tables. Release-Note-Core: Added a --replication-tables-ignore flag that allows you to replicate all tables other than the ones explicity ignored. Thanks, altmannmarcelo! Change-Id: I916cf8f43e97c92f0264787f5dc7617a720832bd Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5865 Tested-by: Buildkite CI Reviewed-by: Luke Osborne --- database-utils/src/lib.rs | 5 ++ replicators/src/noria_adapter.rs | 2 + replicators/src/table_filter.rs | 133 +++++++++++++++++++++++++++++-- 3 files changed, 132 insertions(+), 8 deletions(-) diff --git a/database-utils/src/lib.rs b/database-utils/src/lib.rs index 9c2c14f637..1508f184a5 100644 --- a/database-utils/src/lib.rs +++ b/database-utils/src/lib.rs @@ -80,6 +80,10 @@ pub struct UpstreamConfig { #[serde(default)] pub replication_tables: Option, + #[clap(long, env = "REPLICATION_TABLES_IGNORE")] + #[serde(default)] + pub replication_tables_ignore: Option, + /// Sets the time (in seconds) between reports of progress snapshotting the database. A value /// of 0 disables reporting. #[clap(long, default_value = "30", hide = true)] @@ -140,6 +144,7 @@ impl Default for UpstreamConfig { replication_server_id: Default::default(), replicator_restart_timeout: Duration::from_secs(1), replication_tables: Default::default(), + replication_tables_ignore: Default::default(), snapshot_report_interval_secs: 30, ssl_root_cert: None, replication_pool_size: 50, diff --git a/replicators/src/noria_adapter.rs b/replicators/src/noria_adapter.rs index ee31aa8540..9b8f890708 100644 --- a/replicators/src/noria_adapter.rs +++ b/replicators/src/noria_adapter.rs @@ -287,6 +287,7 @@ impl NoriaAdapter { let table_filter = TableFilter::try_new( nom_sql::Dialect::MySQL, config.replication_tables.take(), + config.replication_tables_ignore.take(), mysql_options.db_name(), )?; @@ -497,6 +498,7 @@ impl NoriaAdapter { let table_filter = TableFilter::try_new( nom_sql::Dialect::PostgreSQL, config.replication_tables.take(), + config.replication_tables_ignore.take(), None, )?; diff --git a/replicators/src/table_filter.rs b/replicators/src/table_filter.rs index 7b6498b94f..08c2be6b65 100644 --- a/replicators/src/table_filter.rs +++ b/replicators/src/table_filter.rs @@ -2,7 +2,7 @@ use std::borrow::Borrow; use std::collections::{BTreeMap, BTreeSet}; use nom_locate::LocatedSpan; -use nom_sql::{replicator_table_list, Dialect, SqlIdentifier}; +use nom_sql::{replicator_table_list, Dialect, Relation, SqlIdentifier}; use readyset_errors::{ReadySetError, ReadySetResult}; use readyset_util::redacted::RedactedString; @@ -10,7 +10,8 @@ use readyset_util::redacted::RedactedString; /// list of tables that we explicitly want to filter out of replication. /// Tables may be filtered from replication in 2 ways: /// 1. All tables will be filtered other than the ones provided to the option --replication_tables, -/// if it is used +/// if it is used OR All tables will be replicated other than the ones provided to the option +/// --replication-tables-ignore, if it is used. /// 2. If we encounter a unrecoverable failure in replication for a table, we can filter out the /// table to keep the process running without that table, which is better than being stuck until /// we fix why that table isn't replicating. @@ -92,12 +93,70 @@ impl TableFilter { pub(crate) fn try_new( dialect: Dialect, filter_table_list: Option, + filter_table_list_ignore: Option, default_schema: Option<&str>, ) -> ReadySetResult { let default_schema = default_schema.map(SqlIdentifier::from); - let mut schemas = BTreeMap::new(); + if filter_table_list.is_some() && filter_table_list_ignore.is_some() { + return Err(ReadySetError::ReplicationFailed( + "Cannot use both --replication-tables and --replication-tables-ignore".to_string(), + )); + } + + let mut schemas: BTreeMap = BTreeMap::new(); + let mut schemas_ignore: BTreeMap = BTreeMap::new(); + + let mut filter_list_ignore: Vec = Vec::new(); + match filter_table_list_ignore { + None => readyset_util::redacted::RedactedString("".to_string()), + Some(t) => { + if t.as_str() == "*.*" { + return Err(ReadySetError::ReplicationFailed( + "Cannot filter out all tables".to_string(), + )); + } + filter_list_ignore = + match replicator_table_list(dialect)(LocatedSpan::new(t.as_bytes())) { + Ok((rem, tables)) if rem.is_empty() => tables, + _ => { + return Err(ReadySetError::ReplicationFailed( + "Unable to parse filtered ignored tables list".to_string(), + )) + } + }; + t + } + }; + + for table in filter_list_ignore { + let table_name = table.name; + let table_schema = + table + .schema + .or_else(|| default_schema.clone()) + .ok_or_else(|| { + ReadySetError::ReplicationFailed(format!( + "No database and no default database for table {table_name}" + )) + })?; + + if table_name == "*" { + schemas_ignore.insert(table_schema, ReplicateTableSpec::empty_all_tables()); + } else { + let tables = schemas_ignore + .entry(table_schema) + .or_insert_with(ReplicateTableSpec::empty); + tables.insert(table_name); + } + } + if !schemas_ignore.is_empty() { + return Ok(TableFilter { + explicitly_replicated: schemas, + replication_denied: schemas_ignore, + }); + } let filtered = match filter_table_list { None => { match default_schema { @@ -106,12 +165,12 @@ impl TableFilter { schemas.insert(default, ReplicateTableSpec::empty_all_tables()); return Ok(TableFilter { explicitly_replicated: schemas, - replication_denied: BTreeMap::new(), + replication_denied: schemas_ignore, }); } None => { // We will learn what the tables are by `update_table_list` at snapshot - // time since `for_all_schemas` is true. + // time since `for_all_schemas` is true and not explicit exclude table. return Ok(Self::for_all_tables()); } }; @@ -157,7 +216,7 @@ impl TableFilter { Ok(TableFilter { explicitly_replicated: schemas, - replication_denied: BTreeMap::new(), + replication_denied: schemas_ignore, }) } @@ -241,7 +300,8 @@ mod tests { #[test] fn empty_list() { - let filter = TableFilter::try_new(nom_sql::Dialect::MySQL, None, Some("noria")).unwrap(); + let filter = + TableFilter::try_new(nom_sql::Dialect::MySQL, None, None, Some("noria")).unwrap(); // By default should only allow all tables from the default schema assert!(filter.should_be_processed("noria", "table")); assert!(!filter.should_be_processed("readyset", "table")); @@ -252,6 +312,7 @@ mod tests { let filter = TableFilter::try_new( nom_sql::Dialect::MySQL, Some("*.*".to_string().into()), + None, Some("noria"), ) .unwrap(); @@ -261,7 +322,7 @@ mod tests { #[test] fn all_schemas_implicit() { - let filter = TableFilter::try_new(nom_sql::Dialect::MySQL, None, None).unwrap(); + let filter = TableFilter::try_new(nom_sql::Dialect::MySQL, None, None, None).unwrap(); assert!(filter.should_be_processed("noria", "table")); assert!(filter.should_be_processed("readyset", "table")); } @@ -271,6 +332,7 @@ mod tests { let filter = TableFilter::try_new( nom_sql::Dialect::MySQL, Some("t1,t2,t3".to_string().into()), + None, Some("noria"), ) .unwrap(); @@ -287,6 +349,7 @@ mod tests { let filter = TableFilter::try_new( nom_sql::Dialect::MySQL, Some("t1,noria.t2,readyset.t4,t3".to_string().into()), + None, Some("noria"), ) .unwrap(); @@ -303,6 +366,7 @@ mod tests { let filter = TableFilter::try_new( nom_sql::Dialect::MySQL, Some("noria.*, readyset.t4, t3".to_string().into()), + None, Some("noria"), ) .unwrap(); @@ -320,6 +384,7 @@ mod tests { let mut filter = TableFilter::try_new( nom_sql::Dialect::MySQL, Some("noria.*, readyset.t4, t3".to_string().into()), + None, Some("noria"), ) .unwrap(); @@ -336,4 +401,56 @@ mod tests { filter.deny_replication("readyset", "t4"); assert!(!filter.should_be_processed("readyset", "t4")); } + + #[test] + fn regular_list_ignore() { + let filter = TableFilter::try_new( + nom_sql::Dialect::MySQL, + None, + Some("t1,t2,t3".to_string().into()), + Some("noria"), + ) + .unwrap(); + // Tables with no schema belong to the default schema + assert!(!filter.should_be_processed("noria", "t1")); + assert!(!filter.should_be_processed("noria", "t2")); + assert!(!filter.should_be_processed("noria", "t3")); + assert!(filter.should_be_processed("noria", "t4")); + assert!(filter.should_be_processed("readyset", "table")); + } + + #[test] + fn mixed_list_ignore() { + let filter = TableFilter::try_new( + nom_sql::Dialect::MySQL, + None, + Some("t1,noria.t2,readyset.t4,t3".to_string().into()), + Some("noria"), + ) + .unwrap(); + assert!(!filter.should_be_processed("noria", "t1")); + assert!(!filter.should_be_processed("noria", "t2")); + assert!(!filter.should_be_processed("noria", "t3")); + assert!(filter.should_be_processed("noria", "t4")); + assert!(!filter.should_be_processed("readyset", "t4")); + assert!(filter.should_be_processed("readyset", "table")); + } + + #[test] + fn wildcard_ignore() { + let filter = TableFilter::try_new( + nom_sql::Dialect::MySQL, + None, + Some("noria.*, readyset.t4, t3".to_string().into()), + Some("noria"), + ) + .unwrap(); + // Namespace with a wildcard contains all tables + assert!(!filter.should_be_processed("noria", "t1")); + assert!(!filter.should_be_processed("noria", "t2")); + assert!(!filter.should_be_processed("noria", "t3")); + assert!(!filter.should_be_processed("noria", "t4")); + assert!(!filter.should_be_processed("readyset", "t4")); + assert!(filter.should_be_processed("readyset", "table")); + } }