Skip to content

Commit

Permalink
replicatiors: Implemented --replication-tables-ignore
Browse files Browse the repository at this point in the history
Sometimes we have an extensive number of tables but only a few of them
we want to avoid replication. Currently, we can only do this via
--replication-tables, However, if we want to filter out just one table,
we need to list all the ones we want, instead of the single one that we
do not want.

Added an option to blacklist the tables that are going to be
replicated.
This option is mutually exclusive with --replication-tables.

Release-Note-Core: Added a --replication-tables-ignore flag that allows
  you to replicate all tables other than the ones explicity ignored.
  Thanks, altmannmarcelo!

Change-Id: I916cf8f43e97c92f0264787f5dc7617a720832bd
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5865
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
altmannmarcelo authored and lukoktonos committed Aug 23, 2023
1 parent cf0990f commit 0f66130
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 8 deletions.
5 changes: 5 additions & 0 deletions database-utils/src/lib.rs
Expand Up @@ -80,6 +80,10 @@ pub struct UpstreamConfig {
#[serde(default)]
pub replication_tables: Option<RedactedString>,

#[clap(long, env = "REPLICATION_TABLES_IGNORE")]
#[serde(default)]
pub replication_tables_ignore: Option<RedactedString>,

This comment has been minimized.

Copy link
@glittershark

glittershark Aug 23, 2023

Member

can we get a docstring for this?


/// Sets the time (in seconds) between reports of progress snapshotting the database. A value
/// of 0 disables reporting.
#[clap(long, default_value = "30", hide = true)]
Expand Down Expand Up @@ -140,6 +144,7 @@ impl Default for UpstreamConfig {
replication_server_id: Default::default(),
replicator_restart_timeout: Duration::from_secs(1),
replication_tables: Default::default(),
replication_tables_ignore: Default::default(),
snapshot_report_interval_secs: 30,
ssl_root_cert: None,
replication_pool_size: 50,
Expand Down
2 changes: 2 additions & 0 deletions replicators/src/noria_adapter.rs
Expand Up @@ -287,6 +287,7 @@ impl NoriaAdapter {
let table_filter = TableFilter::try_new(
nom_sql::Dialect::MySQL,
config.replication_tables.take(),
config.replication_tables_ignore.take(),
mysql_options.db_name(),
)?;

Expand Down Expand Up @@ -497,6 +498,7 @@ impl NoriaAdapter {
let table_filter = TableFilter::try_new(
nom_sql::Dialect::PostgreSQL,
config.replication_tables.take(),
config.replication_tables_ignore.take(),
None,
)?;

Expand Down
133 changes: 125 additions & 8 deletions replicators/src/table_filter.rs
Expand Up @@ -2,15 +2,16 @@ use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet};

use nom_locate::LocatedSpan;
use nom_sql::{replicator_table_list, Dialect, SqlIdentifier};
use nom_sql::{replicator_table_list, Dialect, Relation, SqlIdentifier};
use readyset_errors::{ReadySetError, ReadySetResult};
use readyset_util::redacted::RedactedString;

/// A [`TableFilter`] keeps lists of all the tables readyset-server is interested in, as well as a
/// list of tables that we explicitly want to filter out of replication.
/// Tables may be filtered from replication in 2 ways:
/// 1. All tables will be filtered other than the ones provided to the option --replication_tables,
/// if it is used
/// if it is used OR All tables will be replicated other than the ones provided to the option
/// --replication-tables-ignore, if it is used.
/// 2. If we encounter a unrecoverable failure in replication for a table, we can filter out the
/// table to keep the process running without that table, which is better than being stuck until
/// we fix why that table isn't replicating.
Expand Down Expand Up @@ -92,12 +93,70 @@ impl TableFilter {
pub(crate) fn try_new(
dialect: Dialect,
filter_table_list: Option<RedactedString>,
filter_table_list_ignore: Option<RedactedString>,
default_schema: Option<&str>,
) -> ReadySetResult<TableFilter> {
let default_schema = default_schema.map(SqlIdentifier::from);

let mut schemas = BTreeMap::new();
if filter_table_list.is_some() && filter_table_list_ignore.is_some() {
return Err(ReadySetError::ReplicationFailed(
"Cannot use both --replication-tables and --replication-tables-ignore".to_string(),
));
}

let mut schemas: BTreeMap<SqlIdentifier, ReplicateTableSpec> = BTreeMap::new();
let mut schemas_ignore: BTreeMap<SqlIdentifier, ReplicateTableSpec> = BTreeMap::new();

let mut filter_list_ignore: Vec<Relation> = Vec::new();
match filter_table_list_ignore {
None => readyset_util::redacted::RedactedString("".to_string()),
Some(t) => {
if t.as_str() == "*.*" {
return Err(ReadySetError::ReplicationFailed(
"Cannot filter out all tables".to_string(),
));
}

filter_list_ignore =
match replicator_table_list(dialect)(LocatedSpan::new(t.as_bytes())) {
Ok((rem, tables)) if rem.is_empty() => tables,
_ => {
return Err(ReadySetError::ReplicationFailed(
"Unable to parse filtered ignored tables list".to_string(),
))
}
};
t
}
};

for table in filter_list_ignore {
let table_name = table.name;
let table_schema =
table
.schema
.or_else(|| default_schema.clone())
.ok_or_else(|| {
ReadySetError::ReplicationFailed(format!(
"No database and no default database for table {table_name}"
))
})?;

if table_name == "*" {
schemas_ignore.insert(table_schema, ReplicateTableSpec::empty_all_tables());
} else {
let tables = schemas_ignore
.entry(table_schema)
.or_insert_with(ReplicateTableSpec::empty);
tables.insert(table_name);
}
}
if !schemas_ignore.is_empty() {
return Ok(TableFilter {
explicitly_replicated: schemas,
replication_denied: schemas_ignore,
});
}
let filtered = match filter_table_list {
None => {
match default_schema {
Expand All @@ -106,12 +165,12 @@ impl TableFilter {
schemas.insert(default, ReplicateTableSpec::empty_all_tables());
return Ok(TableFilter {
explicitly_replicated: schemas,
replication_denied: BTreeMap::new(),
replication_denied: schemas_ignore,
});
}
None => {
// We will learn what the tables are by `update_table_list` at snapshot
// time since `for_all_schemas` is true.
// time since `for_all_schemas` is true and not explicit exclude table.
return Ok(Self::for_all_tables());
}
};
Expand Down Expand Up @@ -157,7 +216,7 @@ impl TableFilter {

Ok(TableFilter {
explicitly_replicated: schemas,
replication_denied: BTreeMap::new(),
replication_denied: schemas_ignore,
})
}

Expand Down Expand Up @@ -241,7 +300,8 @@ mod tests {

#[test]
fn empty_list() {
let filter = TableFilter::try_new(nom_sql::Dialect::MySQL, None, Some("noria")).unwrap();
let filter =
TableFilter::try_new(nom_sql::Dialect::MySQL, None, None, Some("noria")).unwrap();
// By default should only allow all tables from the default schema
assert!(filter.should_be_processed("noria", "table"));
assert!(!filter.should_be_processed("readyset", "table"));
Expand All @@ -252,6 +312,7 @@ mod tests {
let filter = TableFilter::try_new(
nom_sql::Dialect::MySQL,
Some("*.*".to_string().into()),
None,
Some("noria"),
)
.unwrap();
Expand All @@ -261,7 +322,7 @@ mod tests {

#[test]
fn all_schemas_implicit() {
let filter = TableFilter::try_new(nom_sql::Dialect::MySQL, None, None).unwrap();
let filter = TableFilter::try_new(nom_sql::Dialect::MySQL, None, None, None).unwrap();
assert!(filter.should_be_processed("noria", "table"));
assert!(filter.should_be_processed("readyset", "table"));
}
Expand All @@ -271,6 +332,7 @@ mod tests {
let filter = TableFilter::try_new(
nom_sql::Dialect::MySQL,
Some("t1,t2,t3".to_string().into()),
None,
Some("noria"),
)
.unwrap();
Expand All @@ -287,6 +349,7 @@ mod tests {
let filter = TableFilter::try_new(
nom_sql::Dialect::MySQL,
Some("t1,noria.t2,readyset.t4,t3".to_string().into()),
None,
Some("noria"),
)
.unwrap();
Expand All @@ -303,6 +366,7 @@ mod tests {
let filter = TableFilter::try_new(
nom_sql::Dialect::MySQL,
Some("noria.*, readyset.t4, t3".to_string().into()),
None,
Some("noria"),
)
.unwrap();
Expand All @@ -320,6 +384,7 @@ mod tests {
let mut filter = TableFilter::try_new(
nom_sql::Dialect::MySQL,
Some("noria.*, readyset.t4, t3".to_string().into()),
None,
Some("noria"),
)
.unwrap();
Expand All @@ -336,4 +401,56 @@ mod tests {
filter.deny_replication("readyset", "t4");
assert!(!filter.should_be_processed("readyset", "t4"));
}

#[test]
fn regular_list_ignore() {
let filter = TableFilter::try_new(
nom_sql::Dialect::MySQL,
None,
Some("t1,t2,t3".to_string().into()),
Some("noria"),
)
.unwrap();
// Tables with no schema belong to the default schema
assert!(!filter.should_be_processed("noria", "t1"));
assert!(!filter.should_be_processed("noria", "t2"));
assert!(!filter.should_be_processed("noria", "t3"));
assert!(filter.should_be_processed("noria", "t4"));
assert!(filter.should_be_processed("readyset", "table"));
}

#[test]
fn mixed_list_ignore() {
let filter = TableFilter::try_new(
nom_sql::Dialect::MySQL,
None,
Some("t1,noria.t2,readyset.t4,t3".to_string().into()),
Some("noria"),
)
.unwrap();
assert!(!filter.should_be_processed("noria", "t1"));
assert!(!filter.should_be_processed("noria", "t2"));
assert!(!filter.should_be_processed("noria", "t3"));
assert!(filter.should_be_processed("noria", "t4"));
assert!(!filter.should_be_processed("readyset", "t4"));
assert!(filter.should_be_processed("readyset", "table"));
}

#[test]
fn wildcard_ignore() {
let filter = TableFilter::try_new(
nom_sql::Dialect::MySQL,
None,
Some("noria.*, readyset.t4, t3".to_string().into()),
Some("noria"),
)
.unwrap();
// Namespace with a wildcard contains all tables
assert!(!filter.should_be_processed("noria", "t1"));
assert!(!filter.should_be_processed("noria", "t2"));
assert!(!filter.should_be_processed("noria", "t3"));
assert!(!filter.should_be_processed("noria", "t4"));
assert!(!filter.should_be_processed("readyset", "t4"));
assert!(filter.should_be_processed("readyset", "table"));
}
}

0 comments on commit 0f66130

Please sign in to comment.