Skip to content

Commit

Permalink
treewide: Add a description why a table cannot be replicated
Browse files Browse the repository at this point in the history
Updated the command "show readyset table" with a description column that
specifies why a particular table was not replicated.

Change-Id: Ib9066a2ce5802e4b29125ea8e790819a264ace22
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6061
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
tbjuhasz authored and lukoktonos committed Sep 21, 2023
1 parent 81af01b commit a23caf3
Show file tree
Hide file tree
Showing 18 changed files with 419 additions and 144 deletions.
5 changes: 4 additions & 1 deletion nom-sql/src/lib.rs
Expand Up @@ -50,7 +50,10 @@ pub use self::set::{
pub use self::show::ShowStatement;
pub use self::sql_identifier::SqlIdentifier;
pub use self::sql_type::{EnumVariants, SqlType, SqlTypeArbitraryOptions};
pub use self::table::{replicator_table_list, Relation, TableExpr, TableExprInner};
pub use self::table::{
replicator_table_list, NonReplicatedRelation, NotReplicatedReason, Relation, TableExpr,
TableExprInner,
};
pub use self::transaction::StartTransactionStatement;
pub use self::update::UpdateStatement;
pub use self::use_statement::UseStatement;
Expand Down
109 changes: 106 additions & 3 deletions nom-sql/src/table.rs
@@ -1,6 +1,6 @@
use std::fmt::Display;
use std::hash::Hash;
use std::str;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::{fmt, str};

use nom::branch::alt;
use nom::bytes::complete::tag;
Expand Down Expand Up @@ -238,3 +238,106 @@ pub fn replicator_table_list(
) -> impl Fn(LocatedSpan<&[u8]>) -> NomSqlResult<&[u8], Vec<Relation>> {
move |i| separated_list1(ws_sep_comma, replicator_table_reference(dialect))(i)
}

#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum NotReplicatedReason {
/// Configuration indicates that a table is not being replicated because
/// it was excluded by configuration.
Configuration,
/// TableDropped indicates that the table that was snapshotted has been dropped
/// and because of this it is no longer being replicated.
TableDropped,
/// Partitioned indicates that the table is a partitioned table which are not
/// supported by ReadySet.
Partitioned,
/// UnsupportedType indicates that a column type in the table is not supported.
/// This will only reference the first unsupported type. If there are more than
/// one in a single table they will not be mentioned.
UnsupportedType(String),
/// OtherError indicates that an error was observed that caused the replication to fail but
/// was not one of the previous type and was unexpected.
OtherError(String),
/// Default is a generic and is used when one of the above enums are not need.
Default,
}

impl NotReplicatedReason {
pub fn description(&self) -> String {
match self {
NotReplicatedReason::Configuration => "The table was either excluded from replicated-tables or included in replication-tables-ignore option.".to_string(),
NotReplicatedReason::TableDropped => "Table has been dropped.".to_string(),
NotReplicatedReason::Partitioned => "Partitioned tables are not supported.".to_string(),
NotReplicatedReason::UnsupportedType(reason) => {
let prefix = "Unsupported type:";
if let Some(start) = reason.find(prefix) {
let start_offset = start + prefix.len();
let type_name_raw = &reason[start_offset..];
let type_name = type_name_raw.trim(); // Trim whitespace
format!("Column type {} is not supported.", type_name)
} else {
"Column type unknown is not supported.".to_string()
}
},
NotReplicatedReason::OtherError(error) => format!("An unexpected replication error occurred: {}", error),
NotReplicatedReason::Default => "No specific reason provided.".to_string(),
}
}

pub fn from_string(reason: &String) -> Self {
if reason.contains("Unsupported type:") {
NotReplicatedReason::UnsupportedType(reason.to_string())
} else {
NotReplicatedReason::OtherError(reason.to_string())
}
}
}
impl Debug for NotReplicatedReason {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Configuration => write!(f, "Configuration"),
Self::TableDropped => write!(f, "TableDropped"),
Self::Partitioned => write!(f, "Partitioned"),
Self::UnsupportedType(s) => write!(f, "UnsupportedType({})", s),
Self::OtherError(s) => write!(f, "OtherError({})", s),
Self::Default => write!(f, ""),
}
}
}

/// NonReplicatedRelations is a struct that wraps Relations with a reason why
/// it is not a replicated relation.
#[derive(Clone, Serialize, Deserialize)]
pub struct NonReplicatedRelation {
pub name: Relation,
pub reason: NotReplicatedReason,
}

impl NonReplicatedRelation {
pub fn new(name: Relation) -> Self {
NonReplicatedRelation {
name,
reason: NotReplicatedReason::Default,
}
}
}
impl Hash for NonReplicatedRelation {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
}
}
impl Debug for NonReplicatedRelation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"NonReplicatedRelation {{ name: {:?}, reason: {:?} }}",
self.name, self.reason
)
}
}

impl PartialEq for NonReplicatedRelation {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
}
}
impl Eq for NonReplicatedRelation {}
25 changes: 21 additions & 4 deletions readyset-adapter/src/backend/noria_connector.rs
Expand Up @@ -1017,7 +1017,7 @@ impl NoriaConnector {

let schema = SelectSchema {
schema: Cow::Owned(
["table", "status"]
["table", "status", "description"]
.iter()
.map(|name| ColumnSchema {
column: nom_sql::Column {
Expand All @@ -1029,15 +1029,32 @@ impl NoriaConnector {
})
.collect(),
),
columns: Cow::Owned(vec!["table".into(), "replication status".into()]),
columns: Cow::Owned(vec![
"table".into(),
"replication status".into(),
"replication status description".into(),
]),
};

let data = statuses
.into_iter()
.map(|(tbl, status)| {
let replication_status_str = status.replication_status.to_string();
let replication_split = replication_status_str
.splitn(2, ": ")
.collect::<Vec<&str>>();
let (replication_status, description) =
if replication_split[0].starts_with("Not Replicated") {
(
replication_split[0].to_string(),
replication_split.get(1).unwrap_or(&"").to_string(),
)
} else {
(status.replication_status.to_string(), "".to_string())
};
vec![
tbl.display(self.parse_dialect).to_string().into(),
status.replication_status.to_string().into(),
replication_status.into(),
description.into(),
]
})
.collect::<Vec<_>>();
Expand Down
6 changes: 4 additions & 2 deletions readyset-client/src/controller.rs
Expand Up @@ -8,7 +8,7 @@ use std::time::{Duration, Instant};

use futures_util::future;
use hyper::client::HttpConnector;
use nom_sql::{CreateCacheStatement, Relation};
use nom_sql::{CreateCacheStatement, NonReplicatedRelation, Relation};
use parking_lot::RwLock;
use petgraph::graph::NodeIndex;
use readyset_errors::{
Expand Down Expand Up @@ -449,7 +449,9 @@ impl ReadySetHandle {
/// recorded via [`Change::AddNonReplicatedRelation`]).
///
/// [`Change::AddNonReplicatedRelation`]: readyset_client::recipe::changelist::Change::AddNonReplicatedRelation
pub async fn non_replicated_relations(&mut self) -> ReadySetResult<HashSet<Relation>> {
pub async fn non_replicated_relations(
&mut self,
) -> ReadySetResult<HashSet<NonReplicatedRelation>> {
self.simple_post_request("non_replicated_relations").await
}

Expand Down
6 changes: 3 additions & 3 deletions readyset-client/src/recipe/changelist.rs
Expand Up @@ -38,8 +38,8 @@ use dataflow_expression::Dialect;
use nom_locate::LocatedSpan;
use nom_sql::{
AlterTableStatement, CacheInner, CreateCacheStatement, CreateTableStatement,
CreateViewStatement, DropTableStatement, DropViewStatement, Relation, SelectStatement,
SqlIdentifier, SqlQuery,
CreateViewStatement, DropTableStatement, DropViewStatement, NonReplicatedRelation, Relation,
SelectStatement, SqlIdentifier, SqlQuery,
};
use readyset_data::DfType;
use readyset_errors::{internal, unsupported, ReadySetError, ReadySetResult};
Expand Down Expand Up @@ -359,7 +359,7 @@ pub enum Change {
/// This is important both to have better error messages for queries that select from
/// non-replicated relations, and to ensure we don't skip over these tables during schema
/// resolution, resulting in queries that read from tables in the wrong schema.
AddNonReplicatedRelation(Relation),
AddNonReplicatedRelation(NonReplicatedRelation),
/// Add a new view to the graph, represented by the given `CREATE VIEW` statement
CreateView(CreateViewStatement),
/// Add a new cached query to the graph
Expand Down
15 changes: 9 additions & 6 deletions readyset-client/src/table.rs
Expand Up @@ -15,7 +15,7 @@ use futures_util::stream::futures_unordered::FuturesUnordered;
use futures_util::stream::TryStreamExt;
use futures_util::{future, ready};
use itertools::Either;
use nom_sql::{CreateTableBody, Relation, SqlIdentifier};
use nom_sql::{CreateTableBody, NotReplicatedReason, Relation, SqlIdentifier};
use petgraph::graph::NodeIndex;
use readyset_data::DfValue;
use readyset_errors::{
Expand Down Expand Up @@ -272,10 +272,10 @@ impl fmt::Debug for PacketData {
}

/// The status of a single table with respect to replication
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum TableReplicationStatus {
/// The table is not being replicated by ReadySet
NotReplicated,
NotReplicated(NotReplicatedReason),
/// The table is currently being snapshotted by ReadySet
Snapshotting,
/// The table has been successfully snapshotted by ReadySet
Expand All @@ -285,10 +285,13 @@ pub enum TableReplicationStatus {
impl Display for TableReplicationStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TableReplicationStatus::NotReplicated => f.write_str("Not Replicated"),
TableReplicationStatus::Snapshotting => f.write_str("Snapshotting"),
TableReplicationStatus::Snapshotted => f.write_str("Snapshotted"),
TableReplicationStatus::NotReplicated(reason) => {
write!(f, "Not Replicated : {}", reason.description())?;
}
TableReplicationStatus::Snapshotting => f.write_str("Snapshotting")?,
TableReplicationStatus::Snapshotted => f.write_str("Snapshotted")?,
}
Ok(())
}
}

Expand Down
6 changes: 4 additions & 2 deletions readyset-data/src/type.rs
Expand Up @@ -306,8 +306,10 @@ impl DfType {
MacAddr => Self::MacAddr,
Inet => Self::Inet,
Citext => Self::Text(Collation::Citext),
Other(ref id) => resolve_custom_type(id.clone())
.ok_or_else(|| unsupported_err!("Unsupported type: {}", id.display_unquoted()))?,
Other(ref id) => resolve_custom_type(id.clone()).ok_or_else(|| {
let id_upper = format!("{}", id.display_unquoted()).to_uppercase();
unsupported_err!("Unsupported type: {}", id_upper)
})?,
})
}
}
Expand Down
59 changes: 40 additions & 19 deletions readyset-server/src/controller/mod.rs
Expand Up @@ -1162,10 +1162,13 @@ async fn handle_controller_request(
#[cfg(test)]
mod tests {

use std::collections::{BTreeMap, HashSet};
use std::collections::BTreeMap;

use dataflow::DomainIndex;
use nom_sql::{parse_create_table, parse_select_statement, Dialect, Relation};
use nom_sql::{
parse_create_table, parse_select_statement, Dialect, NonReplicatedRelation,
NotReplicatedReason, Relation,
};
use readyset_client::recipe::changelist::{Change, ChangeList};
use readyset_client::{KeyCount, TableReplicationStatus, TableStatus, ViewCreateRequest};
use readyset_data::Dialect as DataDialect;
Expand Down Expand Up @@ -1446,13 +1449,19 @@ mod tests {
noria
.extend_recipe(ChangeList::from_changes(
vec![
Change::AddNonReplicatedRelation(Relation {
schema: Some("s1".into()),
name: "t".into(),
Change::AddNonReplicatedRelation(NonReplicatedRelation {
name: Relation {
schema: Some("s1".into()),
name: "t".into(),
},
reason: NotReplicatedReason::Default,
}),
Change::AddNonReplicatedRelation(Relation {
schema: Some("s2".into()),
name: "t".into(),
Change::AddNonReplicatedRelation(NonReplicatedRelation {
name: Relation {
schema: Some("s2".into()),
name: "t".into(),
},
reason: NotReplicatedReason::Default,
}),
],
DataDialect::DEFAULT_MYSQL,
Expand All @@ -1461,19 +1470,26 @@ mod tests {
.unwrap();

let rels = noria.non_replicated_relations().await.unwrap();
assert_eq!(
rels,
HashSet::from([
Relation {

let expected = vec![
(NonReplicatedRelation {
name: Relation {
schema: Some("s1".into()),
name: "t".into(),
},
Relation {
reason: NotReplicatedReason::Default,
}),
(NonReplicatedRelation {
name: Relation {
schema: Some("s2".into()),
name: "t".into(),
},
])
);
reason: NotReplicatedReason::Default,
}),
]
.into_iter()
.collect();
assert_eq!(rels, expected);

shutdown_tx.shutdown().await;
}
Expand All @@ -1484,9 +1500,12 @@ mod tests {
noria
.extend_recipe(ChangeList::from_changes(
vec![
Change::AddNonReplicatedRelation(Relation {
schema: Some("s1".into()),
name: "t".into(),
Change::AddNonReplicatedRelation(NonReplicatedRelation {
name: Relation {
schema: Some("s1".into()),
name: "t".into(),
},
reason: NotReplicatedReason::Configuration,
}),
Change::CreateTable {
statement: parse_create_table(
Expand Down Expand Up @@ -1546,7 +1565,9 @@ mod tests {
name: "t".into(),
},
TableStatus {
replication_status: TableReplicationStatus::NotReplicated
replication_status: TableReplicationStatus::NotReplicated(
NotReplicatedReason::Configuration
)
}
),
(
Expand Down
10 changes: 6 additions & 4 deletions readyset-server/src/controller/sql/mir/mod.rs
Expand Up @@ -18,8 +18,9 @@ pub use mir::{Column, NodeIndex};
use nom_sql::analysis::ReferredColumns;
use nom_sql::{
BinaryOperator, ColumnSpecification, CompoundSelectOperator, CreateTableBody, Expr,
FieldDefinitionExpr, FieldReference, FunctionExpr, InValue, LimitClause, Literal, OrderClause,
OrderType, Relation, SelectStatement, SqlIdentifier, TableKey, UnaryOperator,
FieldDefinitionExpr, FieldReference, FunctionExpr, InValue, LimitClause, Literal,
NonReplicatedRelation, OrderClause, OrderType, Relation, SelectStatement, SqlIdentifier,
TableKey, UnaryOperator,
};
use petgraph::visit::Reversed;
use petgraph::Direction;
Expand Down Expand Up @@ -175,7 +176,7 @@ pub(super) struct SqlToMirConverter {
/// Set of relations (tables or views) that exist in the upstream database, but are not being
/// replicated (either due to lack of support, or because the user explicitly opted out from
/// them being replicated)
pub(in crate::controller::sql) non_replicated_relations: HashSet<Relation>,
pub(in crate::controller::sql) non_replicated_relations: HashSet<NonReplicatedRelation>,
}

impl SqlToMirConverter {
Expand Down Expand Up @@ -217,7 +218,8 @@ impl SqlToMirConverter {
/// upstream database but is not being replicated, or [`ReadySetError::TableNotFound`] if the
/// table is completely unknown
pub(super) fn table_not_found_err(&self, name: &Relation) -> ReadySetError {
if self.non_replicated_relations.contains(name) || is_catalog_table(name) {
let relation = NonReplicatedRelation::new(name.clone());
if self.non_replicated_relations.contains(&relation) || is_catalog_table(name) {
ReadySetError::TableNotReplicated {
name: (&name.name).into(),
schema: name.schema.as_ref().map(Into::into),
Expand Down

0 comments on commit a23caf3

Please sign in to comment.