diff --git a/etl/Cargo.toml b/etl/Cargo.toml index 0e78b6a05..e28ce4f26 100644 --- a/etl/Cargo.toml +++ b/etl/Cargo.toml @@ -28,7 +28,6 @@ required-features = ["failpoints", "test-utils"] config = { workspace = true } postgres = { workspace = true, features = ["tokio", "replication"] } -async-trait = { workspace = true } bigdecimal = { workspace = true, features = ["std"] } bytes = { workspace = true } byteorder = { workspace = true } @@ -41,17 +40,14 @@ gcp-bigquery-client = { workspace = true, optional = true, features = [ ] } pg_escape = { workspace = true } pin-project-lite = { workspace = true } -postgres-protocol = { workspace = true } postgres-replication = { workspace = true } prost = { workspace = true, optional = true } rustls = { workspace = true, features = ["aws-lc-rs", "logging"] } rustls-pemfile = { workspace = true, features = ["std"] } rand = { workspace = true, features = ["std"] } secrecy = { workspace = true } -serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, features = ["std"] } sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] } -thiserror = { workspace = true } tokio = { workspace = true, features = [ "rt-multi-thread", "macros", diff --git a/etl/benches/table_copies.rs b/etl/benches/table_copies.rs index 46a34b0cc..335f515ab 100644 --- a/etl/benches/table_copies.rs +++ b/etl/benches/table_copies.rs @@ -36,7 +36,7 @@ use clap::{Parser, Subcommand, ValueEnum}; use config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig, RetryConfig, TlsConfig}; use etl::{ conversions::{event::Event, table_row::TableRow}, - destination::base::{Destination, DestinationError}, + destination::base::Destination, pipeline::Pipeline, state::{store::notify::NotifyingStateStore, table::TableReplicationPhaseType}, }; @@ -45,6 +45,7 @@ use sqlx::postgres::PgPool; #[cfg(feature = "bigquery")] use etl::destination::bigquery::BigQueryDestination; +use etl::error::EtlResult; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -435,10 +436,7 @@ enum BenchDestination { } impl Destination for BenchDestination { - async fn inject( - &self, - schema_cache: etl::schema::cache::SchemaCache, - ) -> Result<(), DestinationError> { + async fn inject(&self, schema_cache: etl::schema::cache::SchemaCache) -> EtlResult<()> { match self { BenchDestination::Null(dest) => dest.inject(schema_cache).await, #[cfg(feature = "bigquery")] @@ -446,7 +444,7 @@ impl Destination for BenchDestination { } } - async fn write_table_schema(&self, table_schema: TableSchema) -> Result<(), DestinationError> { + async fn write_table_schema(&self, table_schema: TableSchema) -> EtlResult<()> { match self { BenchDestination::Null(dest) => dest.write_table_schema(table_schema).await, #[cfg(feature = "bigquery")] @@ -454,7 +452,7 @@ impl Destination for BenchDestination { } } - async fn load_table_schemas(&self) -> Result, DestinationError> { + async fn load_table_schemas(&self) -> EtlResult> { match self { BenchDestination::Null(dest) => dest.load_table_schemas().await, #[cfg(feature = "bigquery")] @@ -466,7 +464,7 @@ impl Destination for BenchDestination { &self, table_id: TableId, table_rows: Vec, - ) -> Result<(), DestinationError> { + ) -> EtlResult<()> { match self { BenchDestination::Null(dest) => dest.write_table_rows(table_id, table_rows).await, #[cfg(feature = "bigquery")] @@ -474,7 +472,7 @@ impl Destination for BenchDestination { } } - async fn write_events(&self, events: Vec) -> Result<(), DestinationError> { + async fn write_events(&self, events: Vec) -> EtlResult<()> { match self { BenchDestination::Null(dest) => dest.write_events(events).await, #[cfg(feature = "bigquery")] @@ -484,18 +482,15 @@ impl Destination for BenchDestination { } impl Destination for NullDestination { - async fn inject( - &self, - _schema_cache: etl::schema::cache::SchemaCache, - ) -> Result<(), DestinationError> { + async fn inject(&self, _schema_cache: etl::schema::cache::SchemaCache) -> EtlResult<()> { Ok(()) } - async fn write_table_schema(&self, _table_schema: TableSchema) -> Result<(), DestinationError> { + async fn write_table_schema(&self, _table_schema: TableSchema) -> EtlResult<()> { Ok(()) } - async fn load_table_schemas(&self) -> Result, DestinationError> { + async fn load_table_schemas(&self) -> EtlResult> { Ok(vec![]) } @@ -503,11 +498,11 @@ impl Destination for NullDestination { &self, _table_id: TableId, _table_rows: Vec, - ) -> Result<(), DestinationError> { + ) -> EtlResult<()> { Ok(()) } - async fn write_events(&self, _events: Vec) -> Result<(), DestinationError> { + async fn write_events(&self, _events: Vec) -> EtlResult<()> { Ok(()) } } diff --git a/etl/src/clients/bigquery.rs b/etl/src/clients/bigquery.rs index 394b70fcc..183b83c30 100644 --- a/etl/src/clients/bigquery.rs +++ b/etl/src/clients/bigquery.rs @@ -1,5 +1,5 @@ +use crate::error::EtlResult; use futures::StreamExt; -use gcp_bigquery_client::google::cloud::bigquery::storage::v1::RowError; use gcp_bigquery_client::storage::{ColumnMode, StorageApi}; use gcp_bigquery_client::yup_oauth2::parse_service_account_key; use gcp_bigquery_client::{ @@ -10,7 +10,6 @@ use gcp_bigquery_client::{ }; use postgres::schema::ColumnSchema; use std::fmt; -use thiserror::Error; use tokio_postgres::types::Type; use tracing::info; @@ -52,32 +51,6 @@ impl fmt::Display for BigQueryOperationType { } } -/// Collection of row errors returned from BigQuery streaming operations. -#[derive(Debug, Error)] -pub struct RowErrors(pub Vec); - -impl fmt::Display for RowErrors { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if !self.0.is_empty() { - for row_error in self.0.iter() { - writeln!(f, "{row_error:?}")?; - } - } - - Ok(()) - } -} - -/// Errors that can occur when interacting with BigQuery. -#[derive(Debug, Error)] -pub enum BigQueryClientError { - #[error("An error occurred with BigQuery: {0}")] - BigQuery(#[from] BQError), - - #[error("One or multiple errors: {0}")] - AppendRowErrors(#[from] RowErrors), -} - /// A client for interacting with Google BigQuery. /// /// This client provides methods for managing tables, inserting data, @@ -95,7 +68,7 @@ impl BigQueryClient { pub async fn new_with_key_path( project_id: String, sa_key_path: &str, - ) -> Result { + ) -> EtlResult { let client = Client::from_service_account_key_file(sa_key_path).await?; Ok(BigQueryClient { project_id, client }) @@ -105,10 +78,7 @@ impl BigQueryClient { /// /// Parses the provided service account key string to authenticate with the /// BigQuery API. - pub async fn new_with_key( - project_id: String, - sa_key: &str, - ) -> Result { + pub async fn new_with_key(project_id: String, sa_key: &str) -> EtlResult { let sa_key = parse_service_account_key(sa_key).map_err(BQError::from)?; let client = Client::from_service_account_key(sa_key, false).await?; @@ -130,7 +100,7 @@ impl BigQueryClient { table_id: &str, column_schemas: &[ColumnSchema], max_staleness_mins: Option, - ) -> Result { + ) -> EtlResult { if self.table_exists(dataset_id, table_id).await? { return Ok(false); } @@ -148,7 +118,7 @@ impl BigQueryClient { table_id: &str, column_schemas: &[ColumnSchema], max_staleness_mins: Option, - ) -> Result<(), BigQueryClientError> { + ) -> EtlResult<()> { let full_table_name = self.full_table_name(dataset_id, table_id); let columns_spec = Self::create_columns_spec(column_schemas); @@ -168,11 +138,7 @@ impl BigQueryClient { } /// Truncates a table in a BigQuery dataset. - pub async fn truncate_table( - &self, - dataset_id: &str, - table_id: &str, - ) -> Result<(), BigQueryClientError> { + pub async fn truncate_table(&self, dataset_id: &str, table_id: &str) -> EtlResult<()> { let full_table_name = self.full_table_name(dataset_id, table_id); info!("Truncating table {full_table_name} in BigQuery"); @@ -189,11 +155,7 @@ impl BigQueryClient { /// # Panics /// /// Panics if the query result does not contain the expected `table_exists` column. - pub async fn table_exists( - &self, - dataset_id: &str, - table_id: &str, - ) -> Result { + pub async fn table_exists(&self, dataset_id: &str, table_id: &str) -> EtlResult { let table = self .client .table() @@ -216,7 +178,7 @@ impl BigQueryClient { table_id: String, table_descriptor: &TableDescriptor, table_rows: Vec, - ) -> Result<(), BigQueryClientError> { + ) -> EtlResult<()> { // We create a slice on table rows, which will be updated while the streaming progresses. // // Using a slice allows us to deallocate the vector only at the end of streaming, which leads @@ -242,9 +204,7 @@ impl BigQueryClient { if let Some(append_rows_response) = append_rows_stream.next().await { let append_rows_response = append_rows_response.map_err(BQError::from)?; if !append_rows_response.row_errors.is_empty() { - return Err(BigQueryClientError::AppendRowErrors(RowErrors( - append_rows_response.row_errors, - ))); + return Err(append_rows_response.row_errors.into()); } } @@ -258,7 +218,7 @@ impl BigQueryClient { } /// Executes an SQL query and returns the result set. - pub async fn query(&self, request: QueryRequest) -> Result { + pub async fn query(&self, request: QueryRequest) -> EtlResult { let query_response = self.client.job().query(&self.project_id, request).await?; Ok(ResultSet::new_from_query_response(query_response)) diff --git a/etl/src/concurrency/future.rs b/etl/src/concurrency/future.rs index 114e86b3d..35df4562e 100644 --- a/etl/src/concurrency/future.rs +++ b/etl/src/concurrency/future.rs @@ -59,7 +59,7 @@ where /// /// The callback will be notified of either success or failure, and will receive the provided /// identifier to track which future completed. - pub fn new(future: Fut, id: I, callback_source: Arc>) -> Self { + pub fn wrap(future: Fut, id: I, callback_source: Arc>) -> Self { Self { future: AssertUnwindSafe(future).catch_unwind(), callback_fut: None, @@ -271,7 +271,7 @@ mod tests { let callback = Arc::new(Mutex::new(MockCallback::default())); let table_id = 1; - let future = ReactiveFuture::new(ImmediateFuture, table_id, callback.clone()); + let future = ReactiveFuture::wrap(ImmediateFuture, table_id, callback.clone()); future.await.unwrap(); let guard = callback.lock().await; @@ -285,7 +285,7 @@ mod tests { let table_id = 1; // Test string panic - let future = AssertUnwindSafe(ReactiveFuture::new( + let future = AssertUnwindSafe(ReactiveFuture::wrap( PanicFuture { panic_any: false }, table_id, callback.clone(), @@ -312,7 +312,7 @@ mod tests { drop(guard); // Test any panic - let future = AssertUnwindSafe(ReactiveFuture::new( + let future = AssertUnwindSafe(ReactiveFuture::wrap( PanicFuture { panic_any: true }, table_id, callback.clone(), @@ -335,7 +335,7 @@ mod tests { let table_id = 1; let pending_future = PendingFuture::new(); - let future = ReactiveFuture::new(pending_future, table_id, callback.clone()); + let future = ReactiveFuture::wrap(pending_future, table_id, callback.clone()); // First poll should return pending let mut pinned_future = Box::pin(future); @@ -364,7 +364,7 @@ mod tests { let callback = Arc::new(Mutex::new(MockCallback::default())); let table_id = 1; - let future = ReactiveFuture::new(ErrorFuture, table_id, callback.clone()); + let future = ReactiveFuture::wrap(ErrorFuture, table_id, callback.clone()); let result = future.await; // Verify the error was propagated diff --git a/etl/src/conversions/bool.rs b/etl/src/conversions/bool.rs index 040d8c4a0..d71cff542 100644 --- a/etl/src/conversions/bool.rs +++ b/etl/src/conversions/bool.rs @@ -1,17 +1,17 @@ -use thiserror::Error; +use crate::bail; +use crate::error::EtlResult; +use crate::error::{ErrorKind, EtlError}; -#[derive(Debug, Error)] -pub enum ParseBoolError { - #[error("invalid input value: {0}")] - InvalidInput(String), -} - -pub fn parse_bool(s: &str) -> Result { +pub fn parse_bool(s: &str) -> EtlResult { if s == "t" { Ok(true) } else if s == "f" { Ok(false) } else { - Err(ParseBoolError::InvalidInput(s.to_string())) + bail!( + ErrorKind::InvalidData, + "Invalid boolean value", + format!("Boolean value must be 't' or 'f' (received: {s})") + ); } } diff --git a/etl/src/conversions/event.rs b/etl/src/conversions/event.rs index a2175e897..568f3e5d9 100644 --- a/etl/src/conversions/event.rs +++ b/etl/src/conversions/event.rs @@ -1,46 +1,18 @@ -use crate::conversions::Cell; -use crate::conversions::table_row::TableRow; -use crate::conversions::text::{FromTextError, TextFormatConverter}; -use crate::schema::cache::SchemaCache; -use crate::state::store::base::StateStoreError; use core::str; use postgres::schema::{ColumnSchema, TableId, TableName, TableSchema}; use postgres::types::convert_type_oid_to_type; use postgres_replication::protocol; use postgres_replication::protocol::LogicalReplicationMessage; -use std::{fmt, io, str::Utf8Error}; -use thiserror::Error; +use std::fmt; use tokio_postgres::types::PgLsn; -#[derive(Debug, Error)] -pub enum EventConversionError { - #[error("An unknown replication message type was encountered")] - UnknownReplicationMessage, - - #[error("Binary format is not supported for data conversion")] - BinaryFormatNotSupported, - - #[error("The tuple data was not found for column {0} at index {0}")] - TupleDataNotFound(String, usize), - - #[error("Missing tuple data in delete body")] - MissingTupleInDeleteBody, - - #[error("Table schema not found for table id {0}")] - MissingSchema(TableId), - - #[error("Error converting from bytes: {0}")] - FromBytes(#[from] FromTextError), - - #[error("Invalid string value encountered: {0}")] - InvalidStr(#[from] Utf8Error), - - #[error("IO error encountered: {0}")] - Io(#[from] io::Error), - - #[error("An error occurred in the state store: {0}")] - StateStore(#[from] StateStoreError), -} +use crate::conversions::Cell; +use crate::conversions::table_row::TableRow; +use crate::conversions::text::TextFormatConverter; +use crate::error::EtlError; +use crate::error::{ErrorKind, EtlResult}; +use crate::schema::cache::SchemaCache; +use crate::{bail, etl_error}; #[derive(Debug, Clone, PartialEq)] pub struct BeginEvent { @@ -102,7 +74,7 @@ impl RelationEvent { start_lsn: PgLsn, commit_lsn: PgLsn, relation_body: &protocol::RelationBody, - ) -> Result { + ) -> EtlResult { let table_name = TableName::new( relation_body.namespace()?.to_string(), relation_body.name()?.to_string(), @@ -125,9 +97,7 @@ impl RelationEvent { }) } - fn build_column_schema( - column: &protocol::Column, - ) -> Result { + fn build_column_schema(column: &protocol::Column) -> EtlResult { Ok(ColumnSchema::new( column.name()?.to_string(), convert_type_oid_to_type(column.type_id() as u32), @@ -268,30 +238,33 @@ impl From for EventType { } } -async fn get_table_schema( - schema_cache: &SchemaCache, - table_id: TableId, -) -> Result { +async fn get_table_schema(schema_cache: &SchemaCache, table_id: TableId) -> EtlResult { schema_cache .get_table_schema(&table_id) .await - .ok_or(EventConversionError::MissingSchema(table_id)) + .ok_or_else(|| { + etl_error!( + ErrorKind::MissingTableSchema, + "Table not found in the schema cache", + format!("The table schema for table {table_id} was not found in the cache") + ) + }) } fn convert_tuple_to_row( column_schemas: &[ColumnSchema], tuple_data: &[protocol::TupleData], -) -> Result { +) -> EtlResult { let mut values = Vec::with_capacity(column_schemas.len()); for (i, column_schema) in column_schemas.iter().enumerate() { // We are expecting that for each column, there is corresponding tuple data, even for null // values. let Some(tuple_data) = &tuple_data.get(i) else { - return Err(EventConversionError::TupleDataNotFound( - column_schema.name.clone(), - i, - )); + bail!( + ErrorKind::ConversionError, + "Tuple data does not contain data at the specified index" + ); }; let cell = match tuple_data { @@ -302,7 +275,10 @@ fn convert_tuple_to_row( TextFormatConverter::default_value(&column_schema.typ) } protocol::TupleData::Binary(_) => { - return Err(EventConversionError::BinaryFormatNotSupported); + bail!( + ErrorKind::ConversionError, + "Binary format is not supported in tuple data" + ); } protocol::TupleData::Text(bytes) => { let str = str::from_utf8(&bytes[..])?; @@ -321,7 +297,7 @@ async fn convert_insert_to_event( start_lsn: PgLsn, commit_lsn: PgLsn, insert_body: &protocol::InsertBody, -) -> Result { +) -> EtlResult { let table_id = insert_body.rel_id(); let table_schema = get_table_schema(schema_cache, TableId::new(table_id)).await?; @@ -343,7 +319,7 @@ async fn convert_update_to_event( start_lsn: PgLsn, commit_lsn: PgLsn, update_body: &protocol::UpdateBody, -) -> Result { +) -> EtlResult { let table_id = update_body.rel_id(); let table_schema = get_table_schema(schema_cache, TableId::new(table_id)).await?; @@ -379,7 +355,7 @@ async fn convert_delete_to_event( start_lsn: PgLsn, commit_lsn: PgLsn, delete_body: &protocol::DeleteBody, -) -> Result { +) -> EtlResult { let table_id = delete_body.rel_id(); let table_schema = get_table_schema(schema_cache, TableId::new(table_id)).await?; @@ -409,7 +385,7 @@ pub async fn convert_message_to_event( start_lsn: PgLsn, commit_lsn: PgLsn, message: &LogicalReplicationMessage, -) -> Result { +) -> EtlResult { match message { LogicalReplicationMessage::Begin(begin_body) => Ok(Event::Begin( BeginEvent::from_protocol(start_lsn, commit_lsn, begin_body), @@ -441,6 +417,10 @@ pub async fn convert_message_to_event( LogicalReplicationMessage::Origin(_) | LogicalReplicationMessage::Type(_) => { Ok(Event::Unsupported) } - _ => Err(EventConversionError::UnknownReplicationMessage), + _ => bail!( + ErrorKind::ConversionError, + "Replication message not supported", + format!("The replication message {:?} is not supported", message) + ), } } diff --git a/etl/src/conversions/hex.rs b/etl/src/conversions/hex.rs index 153858814..dfcd78a34 100644 --- a/etl/src/conversions/hex.rs +++ b/etl/src/conversions/hex.rs @@ -1,29 +1,25 @@ -use std::num::ParseIntError; +use crate::bail; +use crate::error::EtlError; +use crate::error::{ErrorKind, EtlResult}; -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum ByteaHexParseError { - #[error("missing prefix '\\x'")] - InvalidPrefix, - - #[error("invalid byte")] - OddNumerOfDigits, - - #[error("parse int result: {0}")] - ParseInt(#[from] ParseIntError), -} - -pub fn from_bytea_hex(s: &str) -> Result, ByteaHexParseError> { +pub fn from_bytea_hex(s: &str) -> EtlResult> { if s.len() < 2 || &s[..2] != "\\x" { - return Err(ByteaHexParseError::InvalidPrefix); + bail!( + ErrorKind::ConversionError, + "Could not convert from bytea hex string to byte array", + "The prefix '\\x' is missing" + ); } let mut result = Vec::with_capacity((s.len() - 2) / 2); let s = &s[2..]; if s.len() % 2 != 0 { - return Err(ByteaHexParseError::OddNumerOfDigits); + bail!( + ErrorKind::ConversionError, + "Could not convert from bytea hex string to byte array", + "The number of digits is odd" + ); } for i in (0..s.len()).step_by(2) { diff --git a/etl/src/conversions/table_row.rs b/etl/src/conversions/table_row.rs index d5a0ec8df..0c0aa2d20 100644 --- a/etl/src/conversions/table_row.rs +++ b/etl/src/conversions/table_row.rs @@ -1,14 +1,12 @@ +use super::Cell; +use crate::bail; +use crate::conversions::text::TextFormatConverter; +use crate::error::EtlError; +use crate::error::{ErrorKind, EtlResult}; use core::str; use postgres::schema::ColumnSchema; -use std::str::Utf8Error; -use thiserror::Error; -use tokio_postgres::types::Type; use tracing::error; -use crate::conversions::text::TextFormatConverter; - -use super::{Cell, text::FromTextError}; - #[derive(Debug, Clone, PartialEq)] pub struct TableRow { pub values: Vec, @@ -64,32 +62,11 @@ impl prost::Message for TableRow { } } -#[derive(Debug, Error)] -pub enum TableRowConversionError { - #[error("unsupported type {0}")] - UnsupportedType(Type), - - #[error("invalid string: {0}")] - InvalidString(#[from] Utf8Error), - - #[error("mismatch in num of columns in schema and row")] - NumColsMismatch, - - #[error("unterminated row")] - UnterminatedRow, - - #[error("invalid value: {0}")] - InvalidValue(#[from] FromTextError), -} - pub struct TableRowConverter; impl TableRowConverter { // parses text produced by this code in Postgres: https://github.com/postgres/postgres/blob/263a3f5f7f508167dbeafc2aefd5835b41d77481/src/backend/commands/copyto.c#L988-L1134 - pub fn try_from( - row: &[u8], - column_schemas: &[ColumnSchema], - ) -> Result { + pub fn try_from(row: &[u8], column_schemas: &[ColumnSchema]) -> EtlResult { let mut values = Vec::with_capacity(column_schemas.len()); let row_str = str::from_utf8(row)?; @@ -139,9 +116,10 @@ impl TableRowConverter { }, None => { if !row_terminated { - return Err(TableRowConversionError::UnterminatedRow); + bail!(ErrorKind::ConversionError, "The row is not terminated"); } done = true; + break; } } @@ -149,7 +127,10 @@ impl TableRowConverter { if !done { let Some(column_schema) = column_schemas_iter.next() else { - return Err(TableRowConversionError::NumColsMismatch); + bail!( + ErrorKind::ConversionError, + "The number of columns in the schema and row is mismatched" + ); }; let value = if val_str == "\\N" { @@ -164,7 +145,7 @@ impl TableRowConverter { "error parsing column `{}` of type `{}` from text `{val_str}`", column_schema.name, column_schema.typ ); - return Err(e.into()); + return Err(e); } } }; diff --git a/etl/src/conversions/text.rs b/etl/src/conversions/text.rs index 0b43519ef..543de19d6 100644 --- a/etl/src/conversions/text.rs +++ b/etl/src/conversions/text.rs @@ -1,60 +1,17 @@ -use core::str; -use std::num::{ParseFloatError, ParseIntError}; - -use bigdecimal::ParseBigDecimalError; +use crate::error::EtlError; use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc}; -use thiserror::Error; +use core::str; use tokio_postgres::types::Type; use uuid::Uuid; +use crate::bail; use crate::conversions::{bool::parse_bool, hex}; +use crate::error::{ErrorKind, EtlResult}; -use super::{ArrayCell, Cell, bool::ParseBoolError, hex::ByteaHexParseError, numeric::PgNumeric}; - -#[derive(Debug, Error)] -pub enum FromTextError { - #[error("invalid bool value")] - InvalidBool(#[from] ParseBoolError), - - #[error("invalid int value")] - InvalidInt(#[from] ParseIntError), - - #[error("invalid float value")] - InvalidFloat(#[from] ParseFloatError), - - #[error("invalid numeric: {0}")] - InvalidNumeric(#[from] ParseBigDecimalError), - - #[error("invalid bytea: {0}")] - InvalidBytea(#[from] ByteaHexParseError), - - #[error("invalid uuid: {0}")] - InvalidUuid(#[from] uuid::Error), - - #[error("invalid json: {0}")] - InvalidJson(#[from] serde_json::Error), - - #[error("invalid timestamp: {0} ")] - InvalidTimestamp(#[from] chrono::ParseError), - - #[error("invalid array: {0}")] - InvalidArray(#[from] ArrayParseError), - - #[error("row get error: {0:?}")] - RowGetError(#[from] Box), -} +use super::{ArrayCell, Cell, numeric::PgNumeric}; pub struct TextFormatConverter; -#[derive(Debug, Error)] -pub enum ArrayParseError { - #[error("input too short")] - InputTooShort, - - #[error("missing braces")] - MissingBraces, -} - impl TextFormatConverter { pub fn default_value(typ: &Type) -> Cell { match *typ { @@ -108,7 +65,7 @@ impl TextFormatConverter { } } - pub fn try_from_str(typ: &Type, str: &str) -> Result { + pub fn try_from_str(typ: &Type, str: &str) -> EtlResult { match *typ { Type::BOOL => Ok(Cell::Bool(parse_bool(str)?)), Type::BOOL_ARRAY => TextFormatConverter::parse_array( @@ -266,17 +223,20 @@ impl TextFormatConverter { } } - fn parse_array(str: &str, mut parse: P, m: M) -> Result + fn parse_array(str: &str, mut parse: P, m: M) -> EtlResult where - P: FnMut(&str) -> Result, FromTextError>, + P: FnMut(&str) -> EtlResult>, M: FnOnce(Vec>) -> ArrayCell, { if str.len() < 2 { - return Err(ArrayParseError::InputTooShort.into()); + bail!(ErrorKind::ConversionError, "The array input is too short"); } if !str.starts_with('{') || !str.ends_with('}') { - return Err(ArrayParseError::MissingBraces.into()); + bail!( + ErrorKind::ConversionError, + "The array input is missing braces" + ); } let mut res = vec![]; diff --git a/etl/src/destination/base.rs b/etl/src/destination/base.rs index 1adda4fa9..860fd875b 100644 --- a/etl/src/destination/base.rs +++ b/etl/src/destination/base.rs @@ -1,25 +1,13 @@ use postgres::schema::{TableId, TableSchema}; use std::future::Future; -use thiserror::Error; use crate::conversions::event::Event; use crate::conversions::table_row::TableRow; -#[cfg(feature = "bigquery")] -use crate::destination::bigquery::BigQueryDestinationError; +use crate::error::EtlResult; use crate::schema::cache::SchemaCache; -#[derive(Debug, Error)] -pub enum DestinationError { - #[cfg(feature = "bigquery")] - #[error(transparent)] - BigQuery(#[from] BigQueryDestinationError), -} - pub trait Destination { - fn inject( - &self, - _schema_cache: SchemaCache, - ) -> impl Future> + Send { + fn inject(&self, _schema_cache: SchemaCache) -> impl Future> + Send { // By default, the injection code is a noop, since not all destinations need dependencies // to be injected. async move { Ok(()) } @@ -28,20 +16,15 @@ pub trait Destination { fn write_table_schema( &self, table_schema: TableSchema, - ) -> impl Future> + Send; + ) -> impl Future> + Send; - fn load_table_schemas( - &self, - ) -> impl Future, DestinationError>> + Send; + fn load_table_schemas(&self) -> impl Future>> + Send; fn write_table_rows( &self, table_id: TableId, table_rows: Vec, - ) -> impl Future> + Send; + ) -> impl Future> + Send; - fn write_events( - &self, - events: Vec, - ) -> impl Future> + Send; + fn write_events(&self, events: Vec) -> impl Future> + Send; } diff --git a/etl/src/destination/bigquery.rs b/etl/src/destination/bigquery.rs index b7d3ba4b8..9154b2e20 100644 --- a/etl/src/destination/bigquery.rs +++ b/etl/src/destination/bigquery.rs @@ -4,16 +4,16 @@ use postgres::schema::{ColumnSchema, TableId, TableName, TableSchema}; use std::collections::HashMap; use std::ops::Deref; use std::sync::{Arc, LazyLock}; -use thiserror::Error; use tokio::sync::Mutex; use tokio_postgres::types::{PgLsn, Type}; use tracing::{debug, info, warn}; -use crate::clients::bigquery::{BigQueryClient, BigQueryClientError, BigQueryOperationType}; +use crate::clients::bigquery::{BigQueryClient, BigQueryOperationType}; use crate::conversions::Cell; use crate::conversions::event::{Event, TruncateEvent}; use crate::conversions::table_row::TableRow; -use crate::destination::base::{Destination, DestinationError}; +use crate::destination::base::Destination; +use crate::error::{ErrorKind, EtlError, EtlResult}; use crate::schema::cache::SchemaCache; /// Table name for storing ETL table schema metadata in BigQuery. @@ -71,29 +71,6 @@ static ETL_TABLE_COLUMNS_COLUMNS: LazyLock> = LazyLock::new(|| ] }); -/// Errors that can occur when using [`BigQueryDestination`]. -/// -/// This error type covers BigQuery client failures, schema cache issues, -/// missing table schemas, and serialization problems. -#[derive(Debug, Error)] -pub enum BigQueryDestinationError { - /// Wraps errors from the underlying [`BigQueryClient`]. - #[error("An error occurred with the BigQuery client: {0}")] - BigQueryClient(#[from] BigQueryClientError), - - /// The requested table schema was not found in the schema cache. - #[error("The table schema for table id {0} was not found in the schema cache")] - MissingTableSchema(TableId), - - /// No schema cache has been injected into this destination instance. - #[error("The schema cache was not set on the destination")] - MissingSchemaCache, - - /// JSON serialization failed while processing table schema data. - #[error("Failed to serialize table schema: {0}")] - SerializationError(#[from] serde_json::Error), -} - /// Internal state for [`BigQueryDestination`] wrapped in `Arc>`. /// /// Contains the BigQuery client, dataset configuration, and injected schema cache. @@ -109,7 +86,7 @@ impl Inner { /// Ensures the ETL metadata tables exist in BigQuery. /// /// Creates `etl_table_schemas` and `etl_table_columns` tables if they don't exist. - async fn ensure_schema_tables_exist(&self) -> Result<(), BigQueryDestinationError> { + async fn ensure_schema_tables_exist(&self) -> EtlResult<()> { // Create etl_table_schemas table - use ColumnSchema for compatibility self.client .create_table_if_missing( @@ -158,7 +135,7 @@ impl BigQueryDestination { dataset_id: String, sa_key: &str, max_staleness_mins: Option, - ) -> Result { + ) -> EtlResult { let client = BigQueryClient::new_with_key_path(project_id, sa_key).await?; let inner = Inner { client, @@ -181,7 +158,7 @@ impl BigQueryDestination { dataset_id: String, sa_key: &str, max_staleness_mins: Option, - ) -> Result { + ) -> EtlResult { let client = BigQueryClient::new_with_key(project_id, sa_key).await?; let inner = Inner { client, @@ -202,16 +179,25 @@ impl BigQueryDestination { inner: &I, table_id: &TableId, use_cdc_sequence_column: bool, - ) -> Result<(String, TableDescriptor), BigQueryDestinationError> { + ) -> EtlResult<(String, TableDescriptor)> { let schema_cache = inner .schema_cache .as_ref() - .ok_or(BigQueryDestinationError::MissingSchemaCache)? + .ok_or_else(|| { + EtlError::from(( + ErrorKind::ConfigError, + "The schema cache was not set on the destination", + )) + })? .lock_inner() .await; - let table_schema = schema_cache - .get_table_schema_ref(table_id) - .ok_or(BigQueryDestinationError::MissingTableSchema(*table_id))?; + let table_schema = schema_cache.get_table_schema_ref(table_id).ok_or_else(|| { + EtlError::from(( + ErrorKind::DestinationSchemaError, + "Table schema not found in schema cache", + format!("table_id: {table_id}"), + )) + })?; let table_id = table_schema.name.as_bigquery_table_id(); let table_descriptor = BigQueryClient::column_schemas_to_table_descriptor( @@ -225,10 +211,7 @@ impl BigQueryDestination { /// Writes a table schema to BigQuery, creating the data table and storing metadata. /// /// This method creates the actual data table and inserts schema information into the ETL metadata tables. - async fn write_table_schema( - &self, - table_schema: TableSchema, - ) -> Result<(), BigQueryDestinationError> { + async fn write_table_schema(&self, table_schema: TableSchema) -> EtlResult<()> { let mut inner = self.inner.lock().await; let dataset_id = inner.dataset_id.clone(); @@ -286,13 +269,14 @@ impl BigQueryDestination { } debug!("wrote table schema for table '{}'", table_schema.name); + Ok(()) } /// Loads all table schemas from BigQuery ETL metadata tables. /// /// Reconstructs [`TableSchema`] objects by joining data from schema and column metadata tables. - async fn load_table_schemas(&self) -> Result, BigQueryDestinationError> { + async fn load_table_schemas(&self) -> EtlResult> { let inner = self.inner.lock().await; // First check if schema tables exist @@ -408,7 +392,7 @@ impl BigQueryDestination { &self, table_id: TableId, mut table_rows: Vec, - ) -> Result<(), BigQueryDestinationError> { + ) -> EtlResult<()> { let mut inner = self.inner.lock().await; // We do not use the sequence column for table rows, since we assume that table rows are always @@ -434,7 +418,7 @@ impl BigQueryDestination { /// /// Groups events by type, handles inserts/updates/deletes via streaming, and processes truncates separately. /// Adds sequence numbers to ensure proper ordering of events with the same system time. - async fn write_events(&self, events: Vec) -> Result<(), BigQueryDestinationError> { + async fn write_events(&self, events: Vec) -> EtlResult<()> { let mut event_iter = events.into_iter().peekable(); while event_iter.peek().is_some() { @@ -541,10 +525,7 @@ impl BigQueryDestination { /// /// Maps PostgreSQL table OIDs to BigQuery table names and issues truncate commands. #[allow(dead_code)] - async fn process_truncate_events( - &self, - truncate_events: Vec, - ) -> Result<(), BigQueryDestinationError> { + async fn process_truncate_events(&self, truncate_events: Vec) -> EtlResult<()> { let inner = self.inner.lock().await; for truncate_event in truncate_events { @@ -553,7 +534,12 @@ impl BigQueryDestination { let schema_cache = inner .schema_cache .as_ref() - .ok_or(BigQueryDestinationError::MissingSchemaCache)? + .ok_or_else(|| { + EtlError::from(( + ErrorKind::ConfigError, + "The schema cache was not set on the destination", + )) + })? .lock_inner() .await; @@ -648,7 +634,7 @@ impl BigQueryDestination { /// /// Used when reconstructing schemas from BigQuery metadata tables. Falls back to `TEXT` for unknown types. #[allow(clippy::result_large_err)] - fn string_to_postgres_type(type_str: &str) -> Result { + fn string_to_postgres_type(type_str: &str) -> EtlResult { match type_str { "BOOL" => Ok(Type::BOOL), "CHAR" => Ok(Type::CHAR), @@ -697,20 +683,20 @@ impl BigQueryDestination { } impl Destination for BigQueryDestination { - async fn inject(&self, schema_cache: SchemaCache) -> Result<(), DestinationError> { + async fn inject(&self, schema_cache: SchemaCache) -> EtlResult<()> { let mut inner = self.inner.lock().await; inner.schema_cache = Some(schema_cache); Ok(()) } - async fn write_table_schema(&self, table_schema: TableSchema) -> Result<(), DestinationError> { + async fn write_table_schema(&self, table_schema: TableSchema) -> EtlResult<()> { self.write_table_schema(table_schema).await?; Ok(()) } - async fn load_table_schemas(&self) -> Result, DestinationError> { + async fn load_table_schemas(&self) -> EtlResult> { let table_schemas = self.load_table_schemas().await?; Ok(table_schemas) @@ -720,13 +706,13 @@ impl Destination for BigQueryDestination { &self, table_id: TableId, table_rows: Vec, - ) -> Result<(), DestinationError> { + ) -> EtlResult<()> { self.write_table_rows(table_id, table_rows).await?; Ok(()) } - async fn write_events(&self, events: Vec) -> Result<(), DestinationError> { + async fn write_events(&self, events: Vec) -> EtlResult<()> { self.write_events(events).await?; Ok(()) diff --git a/etl/src/destination/memory.rs b/etl/src/destination/memory.rs index 860509db7..4ed5f5fc1 100644 --- a/etl/src/destination/memory.rs +++ b/etl/src/destination/memory.rs @@ -5,7 +5,8 @@ use tracing::info; use crate::conversions::event::Event; use crate::conversions::table_row::TableRow; -use crate::destination::base::{Destination, DestinationError}; +use crate::destination::base::Destination; +use crate::error::EtlResult; #[derive(Debug)] struct Inner { @@ -40,7 +41,7 @@ impl Default for MemoryDestination { } impl Destination for MemoryDestination { - async fn write_table_schema(&self, table_schema: TableSchema) -> Result<(), DestinationError> { + async fn write_table_schema(&self, table_schema: TableSchema) -> EtlResult<()> { let mut inner = self.inner.lock().await; info!("writing table schema:"); info!("{:?}", table_schema); @@ -49,7 +50,7 @@ impl Destination for MemoryDestination { Ok(()) } - async fn load_table_schemas(&self) -> Result, DestinationError> { + async fn load_table_schemas(&self) -> EtlResult> { let inner = self.inner.lock().await; let schemas = inner.table_schemas.to_vec(); info!("loaded {} table schemas:", schemas.len()); @@ -62,7 +63,7 @@ impl Destination for MemoryDestination { &self, table_id: TableId, table_rows: Vec, - ) -> Result<(), DestinationError> { + ) -> EtlResult<()> { let mut inner = self.inner.lock().await; info!("writing a batch of {} table rows:", table_rows.len()); for table_row in &table_rows { @@ -73,7 +74,7 @@ impl Destination for MemoryDestination { Ok(()) } - async fn write_events(&self, events: Vec) -> Result<(), DestinationError> { + async fn write_events(&self, events: Vec) -> EtlResult<()> { let mut inner = self.inner.lock().await; info!("writing a batch of {} events:", events.len()); for event in &events { diff --git a/etl/src/error.rs b/etl/src/error.rs new file mode 100644 index 000000000..9304eb659 --- /dev/null +++ b/etl/src/error.rs @@ -0,0 +1,877 @@ +use std::error; +use std::fmt; + +/// Convenient result type for ETL operations using [`EtlError`] as the error type. +/// +/// This type alias reduces boilerplate when working with fallible ETL operations. +/// Most ETL functions return this type. +pub type EtlResult = Result; + +/// Main error type for ETL operations. +/// +/// [`EtlError`] provides a comprehensive error system that can represent single errors, +/// errors with additional detail, or multiple aggregated errors. The design allows for +/// rich error information while maintaining ergonomic usage patterns. +#[derive(Debug)] +pub struct EtlError { + repr: ErrorRepr, +} + +/// Internal representation of error data. +/// +/// This enum supports different error patterns while maintaining a unified interface. +/// Users should not interact with this type directly but use [`EtlError`] methods instead. +#[derive(Debug)] +pub enum ErrorRepr { + /// Error with kind and static description + WithDescription(ErrorKind, &'static str), + /// Error with kind, static description, and dynamic detail + WithDescriptionAndDetail(ErrorKind, &'static str, String), + /// Multiple aggregated errors + Many(Vec), +} + +/// Specific categories of errors that can occur during ETL operations. +/// +/// This enum provides granular error classification to enable appropriate error handling +/// strategies. Error kinds are organized by functional area and failure mode. +#[derive(PartialEq, Eq, Copy, Clone, Debug)] +#[non_exhaustive] +pub enum ErrorKind { + /// Database connection failed or resource limitations + /// + /// Used for PostgreSQL connection errors (08xxx), resource errors (53xxx), + /// and general connectivity issues. + ConnectionFailed, + /// Query execution failed + /// + /// Used for PostgreSQL syntax errors (42xxx), BigQuery response errors, + /// and general SQL execution failures. + QueryFailed, + /// Source schema mismatch or validation error + SourceSchemaError, + /// Destination schema mismatch or validation error + /// + /// Used for PostgreSQL schema object not found errors (42xxx) + /// and destination schema mismatches. + DestinationSchemaError, + /// Missing table schema + MissingTableSchema, + /// Data type conversion error + /// + /// Used for PostgreSQL data conversion errors (22xxx), BigQuery column type mismatches, + /// UTF-8 conversion failures, and numeric parsing errors. + ConversionError, + /// Configuration error + /// + /// Used for BigQuery invalid metadata values and general configuration issues. + ConfigError, + /// Network or I/O error + /// + /// Used for PostgreSQL system errors (58xxx), BigQuery transport/request errors, + /// JSON I/O errors, and general I/O failures. + IoError, + /// Serialization error + /// + /// Used for BigQuery JSON serialization errors and data encoding failures. + SerializationError, + /// Deserialization error + /// + /// Used for JSON syntax/data/EOF errors and data decoding failures. + DeserializationError, + /// Encryption/decryption error + EncryptionError, + /// Authentication failed + /// + /// Used for PostgreSQL authentication errors (28xxx), + /// BigQuery authentication errors, and credential failures. + AuthenticationError, + /// Invalid state error + /// + /// Used for PostgreSQL transaction errors (40xxx, 25xxx), + /// BigQuery result set positioning errors, and state inconsistencies. + InvalidState, + /// Invalid data + /// + /// Used for BigQuery invalid column index/name errors, + /// UUID parsing failures, and malformed data. + InvalidData, + /// Data validation error + /// + /// Used for PostgreSQL constraint violations (23xxx) + /// and data integrity validation failures. + ValidationError, + /// Apply worker error + ApplyWorkerPanic, + /// Table sync worker error + TableSyncWorkerPanic, + /// Table sync worker error + TableSyncWorkerCaughtError, + /// Destination-specific error + /// + /// Used for BigQuery gRPC status errors, row errors, + /// and destination-specific failures. + DestinationError, + /// Replication slot not found + ReplicationSlotNotFound, + /// Replication slot already exists + ReplicationSlotAlreadyExists, + /// Replication slot could not be created + ReplicationSlotNotCreated, + /// Unknown error + Unknown, +} + +impl EtlError { + /// Creates an [`EtlError`] containing multiple aggregated errors. + /// + /// This is useful when multiple operations fail and you want to report all failures + /// rather than just the first one. + pub fn many(errors: Vec) -> EtlError { + EtlError { + repr: ErrorRepr::Many(errors), + } + } + + /// Returns the [`ErrorKind`] of this error. + /// + /// For multiple errors, returns the kind of the first error or [`ErrorKind::Unknown`] + /// if the error list is empty. + pub fn kind(&self) -> ErrorKind { + match self.repr { + ErrorRepr::WithDescription(kind, _) + | ErrorRepr::WithDescriptionAndDetail(kind, _, _) => kind, + ErrorRepr::Many(ref errors) => errors + .first() + .map(|err| err.kind()) + .unwrap_or(ErrorKind::Unknown), + } + } + + /// Returns all [`ErrorKind`]s present in this error. + /// + /// For single errors, returns a vector with one element. For multiple errors, + /// returns a flattened vector of all error kinds. + pub fn kinds(&self) -> Vec { + match self.repr { + ErrorRepr::WithDescription(kind, _) + | ErrorRepr::WithDescriptionAndDetail(kind, _, _) => vec![kind], + ErrorRepr::Many(ref errors) => errors + .iter() + .flat_map(|err| err.kinds()) + .collect::>(), + } + } + + /// Returns the detailed error information if available. + /// + /// For multiple errors, returns the detail of the first error that has one. + /// Returns [`None`] if no detailed information is available. + pub fn detail(&self) -> Option<&str> { + match self.repr { + ErrorRepr::WithDescriptionAndDetail(_, _, ref detail) => Some(detail.as_str()), + ErrorRepr::Many(ref errors) => { + // For multiple errors, return the detail of the first error that has one + errors.iter().find_map(|e| e.detail()) + } + _ => None, + } + } +} + +impl PartialEq for EtlError { + fn eq(&self, other: &EtlError) -> bool { + match (&self.repr, &other.repr) { + (ErrorRepr::WithDescription(kind_a, _), ErrorRepr::WithDescription(kind_b, _)) => { + kind_a == kind_b + } + ( + ErrorRepr::WithDescriptionAndDetail(kind_a, _, _), + ErrorRepr::WithDescriptionAndDetail(kind_b, _, _), + ) => kind_a == kind_b, + (ErrorRepr::Many(errors_a), ErrorRepr::Many(errors_b)) => { + errors_a.len() == errors_b.len() + && errors_a.iter().zip(errors_b.iter()).all(|(a, b)| a == b) + } + _ => false, + } + } +} + +impl fmt::Display for EtlError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + match self.repr { + ErrorRepr::WithDescription(kind, desc) => { + fmt::Debug::fmt(&kind, f)?; + f.write_str(": ")?; + desc.fmt(f)?; + + Ok(()) + } + ErrorRepr::WithDescriptionAndDetail(kind, desc, ref detail) => { + fmt::Debug::fmt(&kind, f)?; + f.write_str(": ")?; + desc.fmt(f)?; + f.write_str(" -> ")?; + detail.fmt(f)?; + + Ok(()) + } + ErrorRepr::Many(ref errors) => { + if errors.is_empty() { + write!(f, "Multiple errors occurred (empty)")?; + } else if errors.len() == 1 { + // If there's only one error, just display it directly + errors[0].fmt(f)?; + } else { + write!(f, "Multiple errors occurred ({} total):", errors.len())?; + for (i, error) in errors.iter().enumerate() { + write!(f, "\n {}: {}", i + 1, error)?; + } + } + Ok(()) + } + } + } +} + +impl error::Error for EtlError {} + +// Ergonomic constructors following Redis pattern + +/// Creates an [`EtlError`] from an error kind and static description. +impl From<(ErrorKind, &'static str)> for EtlError { + fn from((kind, desc): (ErrorKind, &'static str)) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescription(kind, desc), + } + } +} + +/// Creates an [`EtlError`] from an error kind, static description, and dynamic detail. +impl From<(ErrorKind, &'static str, String)> for EtlError { + fn from((kind, desc, detail): (ErrorKind, &'static str, String)) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail(kind, desc, detail), + } + } +} + +/// Creates an [`EtlError`] from a vector of errors for aggregation. +impl From> for EtlError +where + E: Into, +{ + fn from(errors: Vec) -> EtlError { + EtlError { + repr: ErrorRepr::Many(errors.into_iter().map(Into::into).collect()), + } + } +} + +// Common standard library error conversions + +/// Converts [`std::io::Error`] to [`EtlError`] with [`ErrorKind::IoError`]. +impl From for EtlError { + fn from(err: std::io::Error) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + ErrorKind::IoError, + "I/O error occurred", + err.to_string(), + ), + } + } +} + +/// Converts [`serde_json::Error`] to [`EtlError`] with appropriate error kind. +/// +/// Maps to [`ErrorKind::SerializationError`] for serialization failures and +/// [`ErrorKind::DeserializationError`] for deserialization failures based on error classification. +impl From for EtlError { + fn from(err: serde_json::Error) -> EtlError { + let (kind, description) = match err.classify() { + serde_json::error::Category::Io => (ErrorKind::IoError, "JSON I/O operation failed"), + serde_json::error::Category::Syntax | serde_json::error::Category::Data => ( + ErrorKind::DeserializationError, + "JSON deserialization failed", + ), + serde_json::error::Category::Eof => ( + ErrorKind::DeserializationError, + "JSON deserialization failed", + ), + }; + + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail(kind, description, err.to_string()), + } + } +} + +/// Converts [`std::str::Utf8Error`] to [`EtlError`] with [`ErrorKind::ConversionError`]. +impl From for EtlError { + fn from(err: std::str::Utf8Error) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + ErrorKind::ConversionError, + "UTF-8 conversion failed", + err.to_string(), + ), + } + } +} + +/// Converts [`std::string::FromUtf8Error`] to [`EtlError`] with [`ErrorKind::ConversionError`]. +impl From for EtlError { + fn from(err: std::string::FromUtf8Error) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + ErrorKind::ConversionError, + "UTF-8 string conversion failed", + err.to_string(), + ), + } + } +} + +/// Converts [`std::num::ParseIntError`] to [`EtlError`] with [`ErrorKind::ConversionError`]. +impl From for EtlError { + fn from(err: std::num::ParseIntError) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + ErrorKind::ConversionError, + "Integer parsing failed", + err.to_string(), + ), + } + } +} + +/// Converts [`std::num::ParseFloatError`] to [`EtlError`] with [`ErrorKind::ConversionError`]. +impl From for EtlError { + fn from(err: std::num::ParseFloatError) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + ErrorKind::ConversionError, + "Float parsing failed", + err.to_string(), + ), + } + } +} + +// PostgreSQL-specific error conversions + +/// Converts [`tokio_postgres::Error`] to [`EtlError`] with appropriate error kind. +/// +/// Maps errors based on PostgreSQL SQLSTATE codes to provide granular error classification +/// for better error handling in ETL operations. +impl From for EtlError { + fn from(err: tokio_postgres::Error) -> EtlError { + let (kind, description) = match err.code() { + Some(sqlstate) => { + use tokio_postgres::error::SqlState; + + match *sqlstate { + // Connection errors (08xxx) + SqlState::CONNECTION_EXCEPTION + | SqlState::CONNECTION_DOES_NOT_EXIST + | SqlState::CONNECTION_FAILURE + | SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION + | SqlState::SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION => { + (ErrorKind::ConnectionFailed, "PostgreSQL connection error") + } + + // Authentication errors (28xxx) + SqlState::INVALID_AUTHORIZATION_SPECIFICATION | SqlState::INVALID_PASSWORD => ( + ErrorKind::AuthenticationError, + "PostgreSQL authentication failed", + ), + + // Data integrity violations (23xxx) + SqlState::INTEGRITY_CONSTRAINT_VIOLATION + | SqlState::NOT_NULL_VIOLATION + | SqlState::FOREIGN_KEY_VIOLATION + | SqlState::UNIQUE_VIOLATION + | SqlState::CHECK_VIOLATION => ( + ErrorKind::ValidationError, + "PostgreSQL constraint violation", + ), + + // Data conversion errors (22xxx) + SqlState::DATA_EXCEPTION + | SqlState::INVALID_TEXT_REPRESENTATION + | SqlState::INVALID_DATETIME_FORMAT + | SqlState::NUMERIC_VALUE_OUT_OF_RANGE + | SqlState::DIVISION_BY_ZERO => ( + ErrorKind::ConversionError, + "PostgreSQL data conversion error", + ), + + // Schema/object not found errors (42xxx) + SqlState::UNDEFINED_TABLE + | SqlState::UNDEFINED_COLUMN + | SqlState::UNDEFINED_FUNCTION + | SqlState::UNDEFINED_SCHEMA => ( + ErrorKind::DestinationSchemaError, + "PostgreSQL schema object not found", + ), + + // Syntax and access errors (42xxx) + SqlState::SYNTAX_ERROR + | SqlState::SYNTAX_ERROR_OR_ACCESS_RULE_VIOLATION + | SqlState::INSUFFICIENT_PRIVILEGE => { + (ErrorKind::QueryFailed, "PostgreSQL syntax or access error") + } + + // Resource errors (53xxx) + SqlState::INSUFFICIENT_RESOURCES + | SqlState::OUT_OF_MEMORY + | SqlState::TOO_MANY_CONNECTIONS => ( + ErrorKind::ConnectionFailed, + "PostgreSQL resource limitation", + ), + + // Transaction errors (40xxx, 25xxx) + SqlState::TRANSACTION_ROLLBACK + | SqlState::T_R_SERIALIZATION_FAILURE + | SqlState::T_R_DEADLOCK_DETECTED + | SqlState::INVALID_TRANSACTION_STATE => { + (ErrorKind::InvalidState, "PostgreSQL transaction error") + } + + // System errors (58xxx, XX xxx) + SqlState::SYSTEM_ERROR | SqlState::IO_ERROR | SqlState::INTERNAL_ERROR => { + (ErrorKind::IoError, "PostgreSQL system error") + } + + // Default for other SQL states + _ => (ErrorKind::QueryFailed, "PostgreSQL query failed"), + } + } + // No SQL state means connection issue + None => (ErrorKind::ConnectionFailed, "PostgreSQL connection failed"), + }; + + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail(kind, description, err.to_string()), + } + } +} + +/// Converts [`rustls::Error`] to [`EtlError`] with [`ErrorKind::EncryptionError`]. +impl From for EtlError { + fn from(err: rustls::Error) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + ErrorKind::EncryptionError, + "TLS configuration failed", + err.to_string(), + ), + } + } +} + +/// Converts [`uuid::Error`] to [`EtlError`] with [`ErrorKind::InvalidData`]. +impl From for EtlError { + fn from(err: uuid::Error) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + ErrorKind::InvalidData, + "UUID parsing failed", + err.to_string(), + ), + } + } +} + +/// Converts [`chrono::ParseError`] to [`EtlError`] with [`ErrorKind::ConversionError`]. +impl From for EtlError { + fn from(err: chrono::ParseError) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + ErrorKind::ConversionError, + "Chrono parse failed", + err.to_string(), + ), + } + } +} + +/// Converts [`bigdecimal::ParseBigDecimalError`] to [`EtlError`] with [`ErrorKind::ConversionError`]. +impl From for EtlError { + fn from(err: bigdecimal::ParseBigDecimalError) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + ErrorKind::ConversionError, + "BigDecimal parsing failed", + err.to_string(), + ), + } + } +} + +// SQLx error conversion + +/// Converts [`sqlx::Error`] to [`EtlError`] with appropriate error kind. +/// +/// Maps database errors to [`ErrorKind::QueryFailed`], I/O errors to [`ErrorKind::IoError`], +/// and connection pool errors to [`ErrorKind::ConnectionFailed`]. +impl From for EtlError { + fn from(err: sqlx::Error) -> EtlError { + let kind = match &err { + sqlx::Error::Database(_) => ErrorKind::QueryFailed, + sqlx::Error::Io(_) => ErrorKind::IoError, + sqlx::Error::PoolClosed | sqlx::Error::PoolTimedOut => ErrorKind::ConnectionFailed, + _ => ErrorKind::QueryFailed, + }; + + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + kind, + "Database operation failed", + err.to_string(), + ), + } + } +} + +// BigQuery error conversions (feature-gated) + +/// Converts [`gcp_bigquery_client::error::BQError`] to [`EtlError`] with appropriate error kind. +/// +/// Maps errors based on their specific type for better error classification and handling. +#[cfg(feature = "bigquery")] +impl From for EtlError { + fn from(err: gcp_bigquery_client::error::BQError) -> EtlError { + use gcp_bigquery_client::error::BQError; + + let (kind, description) = match &err { + // Authentication related errors + BQError::InvalidServiceAccountKey(_) => ( + ErrorKind::AuthenticationError, + "Invalid BigQuery service account key", + ), + BQError::InvalidServiceAccountAuthenticator(_) => ( + ErrorKind::AuthenticationError, + "Invalid BigQuery service account authenticator", + ), + BQError::InvalidInstalledFlowAuthenticator(_) => ( + ErrorKind::AuthenticationError, + "Invalid BigQuery installed flow authenticator", + ), + BQError::InvalidApplicationDefaultCredentialsAuthenticator(_) => ( + ErrorKind::AuthenticationError, + "Invalid BigQuery application default credentials", + ), + BQError::InvalidAuthorizedUserAuthenticator(_) => ( + ErrorKind::AuthenticationError, + "Invalid BigQuery authorized user authenticator", + ), + BQError::AuthError(_) => ( + ErrorKind::AuthenticationError, + "BigQuery authentication error", + ), + BQError::YupAuthError(_) => ( + ErrorKind::AuthenticationError, + "BigQuery OAuth authentication error", + ), + BQError::NoToken => ( + ErrorKind::AuthenticationError, + "BigQuery authentication token missing", + ), + + // Network and transport errors + BQError::RequestError(_) => (ErrorKind::IoError, "BigQuery request failed"), + BQError::TonicTransportError(_) => (ErrorKind::IoError, "BigQuery transport error"), + + // Query and data errors + BQError::ResponseError { .. } => (ErrorKind::QueryFailed, "BigQuery response error"), + BQError::NoDataAvailable => ( + ErrorKind::InvalidState, + "BigQuery result set positioning error", + ), + BQError::InvalidColumnIndex { .. } => { + (ErrorKind::InvalidData, "BigQuery invalid column index") + } + BQError::InvalidColumnName { .. } => { + (ErrorKind::InvalidData, "BigQuery invalid column name") + } + BQError::InvalidColumnType { .. } => { + (ErrorKind::ConversionError, "BigQuery column type mismatch") + } + + // Serialization errors + BQError::SerializationError(_) => ( + ErrorKind::SerializationError, + "BigQuery JSON serialization error", + ), + + // gRPC errors + BQError::TonicInvalidMetadataValueError(_) => { + (ErrorKind::ConfigError, "BigQuery invalid metadata value") + } + BQError::TonicStatusError(_) => { + (ErrorKind::DestinationError, "BigQuery gRPC status error") + } + }; + + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail(kind, description, err.to_string()), + } + } +} + +/// Converts BigQuery row errors to [`EtlError`] with [`ErrorKind::DestinationError`]. +#[cfg(feature = "bigquery")] +impl From for EtlError { + fn from(err: gcp_bigquery_client::google::cloud::bigquery::storage::v1::RowError) -> EtlError { + EtlError { + repr: ErrorRepr::WithDescriptionAndDetail( + ErrorKind::DestinationError, + "BigQuery row error", + format!("{err:?}"), + ), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{bail, etl_error}; + + #[test] + fn test_simple_error_creation() { + let err = EtlError::from((ErrorKind::ConnectionFailed, "Database connection failed")); + assert_eq!(err.kind(), ErrorKind::ConnectionFailed); + assert_eq!(err.detail(), None); + assert_eq!(err.kinds(), vec![ErrorKind::ConnectionFailed]); + } + + #[test] + fn test_error_with_detail() { + let err = EtlError::from(( + ErrorKind::QueryFailed, + "SQL query execution failed", + "Table 'users' doesn't exist".to_string(), + )); + assert_eq!(err.kind(), ErrorKind::QueryFailed); + assert_eq!(err.detail(), Some("Table 'users' doesn't exist")); + assert_eq!(err.kinds(), vec![ErrorKind::QueryFailed]); + } + + #[test] + fn test_multiple_errors() { + let errors = vec![ + EtlError::from((ErrorKind::ValidationError, "Invalid schema")), + EtlError::from((ErrorKind::ConversionError, "Type mismatch")), + EtlError::from((ErrorKind::IoError, "Connection timeout")), + ]; + let multi_err = EtlError::many(errors); + + assert_eq!(multi_err.kind(), ErrorKind::ValidationError); + assert_eq!( + multi_err.kinds(), + vec![ + ErrorKind::ValidationError, + ErrorKind::ConversionError, + ErrorKind::IoError + ] + ); + assert_eq!(multi_err.detail(), None); + } + + #[test] + fn test_multiple_errors_with_detail() { + let errors = vec![ + EtlError::from(( + ErrorKind::ValidationError, + "Invalid schema", + "Missing required field".to_string(), + )), + EtlError::from((ErrorKind::ConversionError, "Type mismatch")), + ]; + let multi_err = EtlError::many(errors); + + assert_eq!(multi_err.detail(), Some("Missing required field")); + } + + #[test] + fn test_from_vector() { + let errors = vec![ + EtlError::from((ErrorKind::ValidationError, "Error 1")), + EtlError::from((ErrorKind::ConversionError, "Error 2")), + ]; + let multi_err = EtlError::from(errors); + assert_eq!(multi_err.kinds().len(), 2); + } + + #[test] + fn test_empty_multiple_errors() { + let multi_err = EtlError::many(vec![]); + assert_eq!(multi_err.kind(), ErrorKind::Unknown); + assert_eq!(multi_err.kinds(), vec![]); + assert_eq!(multi_err.detail(), None); + } + + #[test] + fn test_error_equality() { + let err1 = EtlError::from((ErrorKind::ConnectionFailed, "Connection failed")); + let err2 = EtlError::from((ErrorKind::ConnectionFailed, "Connection failed")); + let err3 = EtlError::from((ErrorKind::QueryFailed, "Query failed")); + + assert_eq!(err1, err2); + assert_ne!(err1, err3); + } + + #[test] + fn test_error_display() { + let err = EtlError::from((ErrorKind::ConnectionFailed, "Database connection failed")); + let display_str = format!("{err}"); + assert!(display_str.contains("ConnectionFailed")); + assert!(display_str.contains("Database connection failed")); + } + + #[test] + fn test_error_display_with_detail() { + let err = EtlError::from(( + ErrorKind::QueryFailed, + "SQL query failed", + "Invalid table name".to_string(), + )); + let display_str = format!("{err}"); + assert!(display_str.contains("QueryFailed")); + assert!(display_str.contains("SQL query failed")); + assert!(display_str.contains("Invalid table name")); + } + + #[test] + fn test_multiple_errors_display() { + let errors = vec![ + EtlError::from((ErrorKind::ValidationError, "Invalid schema")), + EtlError::from((ErrorKind::ConversionError, "Type mismatch")), + ]; + let multi_err = EtlError::many(errors); + let display_str = format!("{multi_err}"); + assert!(display_str.contains("Multiple errors")); + assert!(display_str.contains("2 total")); + } + + #[test] + fn test_macro_usage() { + let err = etl_error!(ErrorKind::ValidationError, "Invalid data format"); + assert_eq!(err.kind(), ErrorKind::ValidationError); + assert_eq!(err.detail(), None); + + let err_with_detail = etl_error!( + ErrorKind::ConversionError, + "Type conversion failed", + "Cannot convert string to integer: 'abc'" + ); + assert_eq!(err_with_detail.kind(), ErrorKind::ConversionError); + assert!(err_with_detail.detail().unwrap().contains("Cannot convert")); + } + + #[test] + fn test_bail_macro() { + fn test_function() -> EtlResult { + bail!(ErrorKind::ValidationError, "Test error"); + } + + fn test_function_with_detail() -> EtlResult { + bail!( + ErrorKind::ConversionError, + "Test error", + "Additional detail" + ); + } + + let result = test_function(); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::ValidationError); + + let result = test_function_with_detail(); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::ConversionError); + assert!(err.detail().unwrap().contains("Additional detail")); + } + + #[test] + fn test_nested_multiple_errors() { + let inner_errors = vec![ + EtlError::from((ErrorKind::ConversionError, "Inner error 1")), + EtlError::from((ErrorKind::ValidationError, "Inner error 2")), + ]; + let inner_multi = EtlError::many(inner_errors); + + let outer_errors = vec![ + inner_multi, + EtlError::from((ErrorKind::IoError, "Outer error")), + ]; + let outer_multi = EtlError::many(outer_errors); + + let kinds = outer_multi.kinds(); + assert_eq!(kinds.len(), 3); + assert!(kinds.contains(&ErrorKind::ConversionError)); + assert!(kinds.contains(&ErrorKind::ValidationError)); + assert!(kinds.contains(&ErrorKind::IoError)); + } + + #[test] + #[cfg(feature = "bigquery")] + fn test_bigquery_error_mapping() { + use gcp_bigquery_client::error::BQError; + + // Test authentication error + let auth_err = BQError::NoToken; + let etl_err = EtlError::from(auth_err); + assert_eq!(etl_err.kind(), ErrorKind::AuthenticationError); + + // Test invalid data error + let invalid_col_err = BQError::InvalidColumnIndex { col_index: 5 }; + let etl_err = EtlError::from(invalid_col_err); + assert_eq!(etl_err.kind(), ErrorKind::InvalidData); + + // Test conversion error + let type_err = BQError::InvalidColumnType { + col_index: 0, + col_type: "STRING".to_string(), + type_requested: "INTEGER".to_string(), + }; + let etl_err = EtlError::from(type_err); + assert_eq!(etl_err.kind(), ErrorKind::ConversionError); + } + + #[test] + fn test_postgres_error_mapping() { + // Test that our PostgreSQL error mapping logic is correctly structured + // by verifying that we can convert standard IO errors to ETL errors + let io_err = + std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "Connection refused"); + let etl_err = EtlError::from(io_err); + assert_eq!(etl_err.kind(), ErrorKind::IoError); + assert!(etl_err.detail().unwrap().contains("Connection refused")); + + // Note: Testing actual PostgreSQL SQLSTATE mapping would require + // creating mock database errors, which is complex. The mapping logic + // is verified through the comprehensive match patterns above. + } + + #[test] + fn test_json_error_classification() { + // Test syntax error during deserialization + let json_err = serde_json::from_str::("invalid json").unwrap_err(); + let etl_err = EtlError::from(json_err); + assert_eq!(etl_err.kind(), ErrorKind::DeserializationError); + assert!(etl_err.detail().unwrap().contains("expected")); + + // Test data error during deserialization + let json_err = serde_json::from_str::("\"not_a_bool\"").unwrap_err(); + let etl_err = EtlError::from(json_err); + assert_eq!(etl_err.kind(), ErrorKind::DeserializationError); + assert!(etl_err.detail().is_some()); + } +} diff --git a/etl/src/failpoints.rs b/etl/src/failpoints.rs index 947339aa9..cb19a20c1 100644 --- a/etl/src/failpoints.rs +++ b/etl/src/failpoints.rs @@ -1 +1,2 @@ -pub const START_TABLE_SYNC_AFTER_DATA_SYNC: &str = "start_table_sync.after_data_sync"; +pub const START_TABLE_SYNC_AFTER_DATA_SYNC_PANIC: &str = "start_table_sync.after_data_sync.panic"; +pub const START_TABLE_SYNC_AFTER_DATA_SYNC_ERROR: &str = "start_table_sync.after_data_sync.error"; diff --git a/etl/src/lib.rs b/etl/src/lib.rs index ad6f32a24..cb0fc8c28 100644 --- a/etl/src/lib.rs +++ b/etl/src/lib.rs @@ -3,8 +3,10 @@ pub mod concurrency; pub mod conversions; pub mod destination; pub mod encryption; +pub mod error; #[cfg(feature = "failpoints")] pub mod failpoints; +mod macros; pub mod pipeline; pub mod replication; pub mod schema; diff --git a/etl/src/macros.rs b/etl/src/macros.rs new file mode 100644 index 000000000..fae64a934 --- /dev/null +++ b/etl/src/macros.rs @@ -0,0 +1,19 @@ +#[macro_export] +macro_rules! etl_error { + ($kind:expr, $desc:expr) => { + EtlError::from(($kind, $desc)) + }; + ($kind:expr, $desc:expr, $detail:expr) => { + EtlError::from(($kind, $desc, $detail.to_string())) + }; +} + +#[macro_export] +macro_rules! bail { + ($kind:expr, $desc:expr) => { + return Err($crate::etl_error!($kind, $desc)) + }; + ($kind:expr, $desc:expr, $detail:expr) => { + return Err($crate::etl_error!($kind, $desc, $detail)) + }; +} diff --git a/etl/src/pipeline.rs b/etl/src/pipeline.rs index 651d583ec..2eb58b647 100644 --- a/etl/src/pipeline.rs +++ b/etl/src/pipeline.rs @@ -1,45 +1,20 @@ use config::shared::PipelineConfig; use std::sync::Arc; -use thiserror::Error; -use tokio::sync::{Semaphore, watch}; +use tokio::sync::Semaphore; use tracing::{error, info}; +use crate::bail; use crate::concurrency::shutdown::{ShutdownTx, create_shutdown_channel}; -use crate::destination::base::{Destination, DestinationError}; -use crate::replication::client::{PgReplicationClient, PgReplicationError}; +use crate::destination::base::Destination; +use crate::error::{ErrorKind, EtlError, EtlResult}; +use crate::replication::client::PgReplicationClient; use crate::schema::cache::SchemaCache; -use crate::state::store::base::{StateStore, StateStoreError}; +use crate::state::store::base::StateStore; use crate::state::table::TableReplicationPhase; -use crate::workers::apply::{ApplyWorker, ApplyWorkerError, ApplyWorkerHandle}; -use crate::workers::base::{Worker, WorkerHandle, WorkerWaitErrors}; +use crate::workers::apply::{ApplyWorker, ApplyWorkerHandle}; +use crate::workers::base::{Worker, WorkerHandle}; use crate::workers::pool::TableSyncWorkerPool; -#[derive(Debug, Error)] -pub enum PipelineError { - #[error("One or more workers failed: {0}")] - OneOrMoreWorkersFailed(WorkerWaitErrors), - - #[error("PostgreSQL replication operation failed: {0}")] - PgReplicationClient(#[from] PgReplicationError), - - #[error("Apply worker failed to start in the pipeline: {0}")] - ApplyWorkerFailedOnStart(#[from] ApplyWorkerError), - - #[error("An error happened in the state store: {0}")] - StateStore(#[from] StateStoreError), - - #[error("An error occurred in the destination: {0}")] - Destination(#[from] DestinationError), - - #[error( - "An error occurred while shutting down the pipeline, likely because no workers are running: {0}" - )] - ShutdownFailed(#[from] watch::error::SendError<()>), - - #[error("The publication '{0}' does not exist in the database")] - MissingPublication(String), -} - #[derive(Debug)] enum PipelineWorkers { NotStarted, @@ -94,7 +69,7 @@ where self.shutdown_tx.clone() } - pub async fn start(&mut self) -> Result<(), PipelineError> { + pub async fn start(&mut self) -> EtlResult<()> { info!( "starting pipeline for publication '{}' with id {}", self.config.publication_name, self.id @@ -144,7 +119,7 @@ where Ok(()) } - async fn prepare_schema_cache(&self, schema_cache: &SchemaCache) -> Result<(), PipelineError> { + async fn prepare_schema_cache(&self, schema_cache: &SchemaCache) -> EtlResult<()> { // We initialize the schema cache, which is local to a pipeline, and we try to load existing // schemas that were previously stored at the destination (if any). let table_schemas = self.destination.load_table_schemas().await?; @@ -156,7 +131,7 @@ where async fn initialize_table_states( &self, replication_client: &PgReplicationClient, - ) -> Result<(), PipelineError> { + ) -> EtlResult<()> { info!( "initializing table states for tables in publication '{}'", self.config.publication_name @@ -171,9 +146,15 @@ where "publication '{}' does not exist in the database", self.config.publication_name ); - return Err(PipelineError::MissingPublication( - self.config.publication_name.clone(), - )); + + bail!( + ErrorKind::ConfigError, + "Missing publication", + format!( + "The publication '{}' does not exist in the database", + self.config.publication_name + ) + ); } let table_ids = replication_client @@ -193,10 +174,9 @@ where Ok(()) } - pub async fn wait(self) -> Result<(), PipelineError> { + pub async fn wait(self) -> EtlResult<()> { let PipelineWorkers::Started { apply_worker, pool } = self.workers else { info!("pipeline was not started, nothing to wait for"); - return Ok(()); }; @@ -231,33 +211,33 @@ where // We wait for all table sync workers to finish. let table_sync_workers_result = pool.wait_all().await; if let Err(err) = table_sync_workers_result { - errors.extend(err.0); - + errors.push(err); info!("one or more table sync workers failed with an error"); } else { info!("all table sync workers completed successfully"); } if !errors.is_empty() { - return Err(PipelineError::OneOrMoreWorkersFailed(WorkerWaitErrors( - errors, - ))); + return Err(errors.into()); } Ok(()) } #[allow(clippy::result_large_err)] - pub fn shutdown(&self) -> Result<(), PipelineError> { + pub fn shutdown(&self) { info!("trying to shut down the pipeline"); - self.shutdown_tx.shutdown()?; - info!("shut down signal successfully sent to all workers"); - Ok(()) + if let Err(err) = self.shutdown_tx.shutdown() { + error!("failed to send shutdown signal to the pipeline: {}", err); + return; + } + + info!("shut down signal successfully sent to all workers"); } - pub async fn shutdown_and_wait(self) -> Result<(), PipelineError> { - self.shutdown()?; + pub async fn shutdown_and_wait(self) -> EtlResult<()> { + self.shutdown(); self.wait().await } } diff --git a/etl/src/replication/apply.rs b/etl/src/replication/apply.rs index a693f443a..92f94732e 100644 --- a/etl/src/replication/apply.rs +++ b/etl/src/replication/apply.rs @@ -1,16 +1,3 @@ -use crate::concurrency::shutdown::ShutdownRx; -use crate::conversions::event::{Event, EventConversionError, EventType, convert_message_to_event}; -use crate::destination::base::{Destination, DestinationError}; -use crate::pipeline::PipelineId; -use crate::replication::client::{PgReplicationClient, PgReplicationError}; -use crate::replication::slot::{SlotError, get_slot_name}; -use crate::replication::stream::{EventsStream, EventsStreamError}; -use crate::schema::cache::SchemaCache; -use crate::workers::apply::ApplyWorkerHookError; -use crate::workers::base::WorkerType; -use crate::workers::table_sync::TableSyncWorkerHookError; - -use crate::concurrency::signal::SignalRx; use config::shared::PipelineConfig; use futures::{FutureExt, StreamExt}; use postgres::schema::TableId; @@ -20,96 +7,48 @@ use std::future::{Future, pending}; use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, Instant}; -use thiserror::Error; use tokio::pin; use tokio_postgres::types::PgLsn; -use tracing::{debug, error, info}; +use tracing::{debug, info}; + +use crate::concurrency::shutdown::ShutdownRx; +use crate::concurrency::signal::SignalRx; +use crate::conversions::event::{Event, EventType, convert_message_to_event}; +use crate::destination::base::Destination; +use crate::error::{ErrorKind, EtlError, EtlResult}; +use crate::pipeline::PipelineId; +use crate::replication::client::PgReplicationClient; +use crate::replication::slot::get_slot_name; +use crate::replication::stream::EventsStream; +use crate::schema::cache::SchemaCache; +use crate::workers::base::WorkerType; +use crate::{bail, etl_error}; /// The amount of milliseconds that pass between one refresh and the other of the system, in case no /// events or shutdown signal are received. const REFRESH_INTERVAL: Duration = Duration::from_millis(1000); -// TODO: figure out how to break the cycle and remove `Box`. -#[derive(Debug, Error)] -pub enum ApplyLoopError { - #[error("Apply worker hook operation failed: {0}")] - ApplyWorkerHook(Box), - - #[error("Table sync worker hook operation failed: {0}")] - TableSyncWorkerHook(Box), - - #[error("A Postgres replication error occurred in the apply loop: {0}")] - PgReplication(#[from] PgReplicationError), - - #[error("An error occurred while streaming logical replication changes in the apply loop: {0}")] - LogicalReplicationStreamFailed(#[from] EventsStreamError), - - #[error("Could not generate slot name in the apply loop: {0}")] - Slot(#[from] SlotError), - - #[error("An error occurred while building an event from a message in the apply loop: {0}")] - EventConversion(#[from] EventConversionError), - - #[error("An error occurred when interacting with the destination in the apply loop: {0}")] - Destination(#[from] DestinationError), - - #[error("A transaction should have started for the action ({0}) to be performed")] - InvalidTransaction(String), - - #[error("Incorrect commit LSN {0} in COMMIT message (expected {1})")] - InvalidCommitLsn(PgLsn, PgLsn), - - #[error("An invalid event {0} was received (expected {1})")] - InvalidEvent(EventType, EventType), - - #[error("The table schema for table {0} was not found in the cache")] - MissingTableSchema(TableId), - - #[error("The received table schema doesn't match the table schema loaded during table sync")] - MismatchedTableSchema, -} - -impl From for ApplyLoopError { - fn from(err: ApplyWorkerHookError) -> Self { - ApplyLoopError::ApplyWorkerHook(Box::new(err)) - } -} - -impl From for ApplyLoopError { - fn from(err: TableSyncWorkerHookError) -> Self { - ApplyLoopError::TableSyncWorkerHook(Box::new(err)) - } -} - #[derive(Debug, Copy, Clone)] pub enum ApplyLoopResult { ApplyStopped, } pub trait ApplyLoopHook { - type Error: Into; - - fn before_loop( - &self, - start_lsn: PgLsn, - ) -> impl Future> + Send; + fn before_loop(&self, start_lsn: PgLsn) -> impl Future> + Send; fn process_syncing_tables( &self, current_lsn: PgLsn, update_state: bool, - ) -> impl Future> + Send; + ) -> impl Future> + Send; - fn skip_table( - &self, - table_id: TableId, - ) -> impl Future> + Send; + fn skip_table(&self, table_id: TableId) -> impl Future> + Send; fn should_apply_changes( &self, table_id: TableId, remote_final_lsn: PgLsn, - ) -> impl Future> + Send; + ) -> impl Future> + Send; fn worker_type(&self) -> WorkerType; } @@ -262,11 +201,10 @@ pub async fn start_apply_loop( hook: T, mut shutdown_rx: ShutdownRx, mut force_syncing_tables_rx: Option, -) -> Result +) -> EtlResult where D: Destination + Clone + Send + 'static, T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { info!( "starting apply loop in worker '{:?}' from lsn {}", @@ -356,6 +294,7 @@ where // sync at the next transaction boundary. if !state.handling_transaction() { debug!("forcefully processing syncing tables"); + let continue_loop = hook.process_syncing_tables(state.next_status_update.flush_lsn, true).await?; if !continue_loop { return Ok(ApplyLoopResult::ApplyStopped); @@ -389,11 +328,10 @@ async fn handle_replication_message_batch( hook: &T, max_batch_size: usize, max_batch_fill_duration: Duration, -) -> Result +) -> EtlResult where D: Destination + Clone + Send + 'static, T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { let result = handle_replication_message(state, events_stream, message, schema_cache, hook).await?; @@ -425,11 +363,10 @@ async fn try_send_batch( hook: &T, max_batch_size: usize, max_batch_fill_duration: Duration, -) -> Result +) -> EtlResult where D: Destination + Clone + Send + 'static, T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { let elapsed = state.last_batch_send_time.elapsed(); // `elapsed` could be zero in case current time is earlier than `last_batch_send_time`. @@ -508,10 +445,9 @@ async fn handle_replication_message( message: ReplicationMessage, schema_cache: &SchemaCache, hook: &T, -) -> Result +) -> EtlResult where T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { match message { ReplicationMessage::XLogData(message) => { @@ -565,10 +501,9 @@ async fn handle_logical_replication_message( message: LogicalReplicationMessage, schema_cache: &SchemaCache, hook: &T, -) -> Result +) -> EtlResult where T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { // We perform the conversion of the message to our own event format which is used downstream // by the destination. @@ -613,19 +548,20 @@ where } #[allow(clippy::result_large_err)] -fn get_commit_lsn( - state: &ApplyLoopState, - message: &LogicalReplicationMessage, -) -> Result { +fn get_commit_lsn(state: &ApplyLoopState, message: &LogicalReplicationMessage) -> EtlResult { // If we are in a `Begin` message, the `commit_lsn` is the `final_lsn` of the payload, in all the // other cases we read the `remote_final_lsn` which should be always set in case we are within or // at the end of a transaction (meaning that the event type is different from `Begin`). if let LogicalReplicationMessage::Begin(message) = message { Ok(PgLsn::from(message.final_lsn())) } else { - state - .remote_final_lsn - .ok_or_else(|| ApplyLoopError::InvalidTransaction("get_commit_lsn".to_owned())) + state.remote_final_lsn.ok_or_else(|| { + etl_error!( + ErrorKind::InvalidState, + "Invalid transaction", + "A transaction should have started for get_commit_lsn to be performed" + ) + }) } } @@ -633,9 +569,16 @@ async fn handle_begin_message( state: &mut ApplyLoopState, event: Event, message: &protocol::BeginBody, -) -> Result { +) -> EtlResult { let EventType::Begin = event.event_type() else { - return Err(ApplyLoopError::InvalidEvent(event.into(), EventType::Begin)); + bail!( + ErrorKind::ValidationError, + "Invalid event", + format!( + "An invalid event {event:?} was received (expected {:?})", + EventType::Begin + ) + ); }; // We track the final lsn of this transaction, which should be equal to the `commit_lsn` of the @@ -656,25 +599,30 @@ async fn handle_commit_message( event: Event, message: &protocol::CommitBody, hook: &T, -) -> Result +) -> EtlResult where T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { let EventType::Commit = event.event_type() else { - return Err(ApplyLoopError::InvalidEvent( - event.into(), - EventType::Commit, - )); + bail!( + ErrorKind::ValidationError, + "Invalid event", + format!( + "An invalid event {event:?} was received (expected {:?})", + EventType::Commit + ) + ); }; // We take the LSN that belongs to the current transaction, however, if there is no // LSN, it means that a `Begin` message was not received before this `Commit` which means // we are in an inconsistent state. let Some(remote_final_lsn) = state.remote_final_lsn.take() else { - return Err(ApplyLoopError::InvalidTransaction( - "handle_commit_message".to_owned(), - )); + bail!( + ErrorKind::InvalidState, + "Invalid transaction", + "A transaction should have started for handle_commit_message to be performed" + ); }; // If the commit lsn of the message is different from the remote final lsn, it means that the @@ -682,10 +630,14 @@ where // we want to bail assuming we are in an inconsistent state. let commit_lsn = PgLsn::from(message.commit_lsn()); if commit_lsn != remote_final_lsn { - return Err(ApplyLoopError::InvalidCommitLsn( - commit_lsn, - remote_final_lsn, - )); + bail!( + ErrorKind::ValidationError, + "Invalid commit LSN", + format!( + "Incorrect commit LSN {} in COMMIT message (expected {})", + commit_lsn, remote_final_lsn + ) + ); } let end_lsn = PgLsn::from(message.end_lsn()); @@ -727,22 +679,27 @@ async fn handle_relation_message( message: &protocol::RelationBody, schema_cache: &SchemaCache, hook: &T, -) -> Result +) -> EtlResult where T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { let Event::Relation(event) = event else { - return Err(ApplyLoopError::InvalidEvent( - event.into(), - EventType::Relation, - )); + bail!( + ErrorKind::ValidationError, + "Invalid event", + format!( + "An invalid event {event:?} was received (expected {:?})", + EventType::Relation + ) + ); }; let Some(remote_final_lsn) = state.remote_final_lsn else { - return Err(ApplyLoopError::InvalidTransaction( - "handle_relation_message".to_owned(), - )); + bail!( + ErrorKind::InvalidState, + "Invalid transaction", + "A transaction should have started for handle_relation_message to be performed" + ); }; if !hook @@ -759,9 +716,14 @@ where let Some(existing_table_schema) = schema_cache.get_table_schema_ref(&TableId::new(message.rel_id())) else { - return Err(ApplyLoopError::MissingTableSchema(TableId::new( - message.rel_id(), - ))); + bail!( + ErrorKind::MissingTableSchema, + "Table not found in the schema cache", + format!( + "The table schema for table {} was not found in the cache", + message.rel_id() + ) + ); }; // We compare the table schema from the relation message with the existing schema (if any). @@ -788,22 +750,27 @@ async fn handle_insert_message( event: Event, message: &protocol::InsertBody, hook: &T, -) -> Result +) -> EtlResult where T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { let Event::Insert(event) = event else { - return Err(ApplyLoopError::InvalidEvent( - event.into(), - EventType::Insert, - )); + bail!( + ErrorKind::ValidationError, + "Invalid event", + format!( + "An invalid event {event:?} was received (expected {:?})", + EventType::Insert + ) + ); }; let Some(remote_final_lsn) = state.remote_final_lsn else { - return Err(ApplyLoopError::InvalidTransaction( - "handle_insert_message".to_owned(), - )); + bail!( + ErrorKind::InvalidState, + "Invalid transaction", + "A transaction should have started for handle_insert_message to be performed" + ); }; if !hook @@ -826,22 +793,27 @@ async fn handle_update_message( event: Event, message: &protocol::UpdateBody, hook: &T, -) -> Result +) -> EtlResult where T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { let Event::Update(event) = event else { - return Err(ApplyLoopError::InvalidEvent( - event.into(), - EventType::Update, - )); + bail!( + ErrorKind::ValidationError, + "Invalid event", + format!( + "An invalid event {event:?} was received (expected {:?})", + EventType::Update + ) + ); }; let Some(remote_final_lsn) = state.remote_final_lsn else { - return Err(ApplyLoopError::InvalidTransaction( - "handle_update_message".to_owned(), - )); + bail!( + ErrorKind::InvalidState, + "Invalid transaction", + "A transaction should have started for handle_update_message to be performed" + ); }; if !hook @@ -864,22 +836,27 @@ async fn handle_delete_message( event: Event, message: &protocol::DeleteBody, hook: &T, -) -> Result +) -> EtlResult where T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { let Event::Delete(event) = event else { - return Err(ApplyLoopError::InvalidEvent( - event.into(), - EventType::Delete, - )); + bail!( + ErrorKind::ValidationError, + "Invalid event", + format!( + "An invalid event {event:?} was received (expected {:?})", + EventType::Delete + ) + ); }; let Some(remote_final_lsn) = state.remote_final_lsn else { - return Err(ApplyLoopError::InvalidTransaction( - "handle_delete_message".to_owned(), - )); + bail!( + ErrorKind::InvalidState, + "Invalid transaction", + "A transaction should have started for handle_delete_message to be performed" + ); }; if !hook @@ -902,22 +879,27 @@ async fn handle_truncate_message( event: Event, message: &protocol::TruncateBody, hook: &T, -) -> Result +) -> EtlResult where T: ApplyLoopHook, - ApplyLoopError: From<::Error>, { let Event::Truncate(mut event) = event else { - return Err(ApplyLoopError::InvalidEvent( - event.into(), - EventType::Truncate, - )); + bail!( + ErrorKind::ValidationError, + "Invalid event", + format!( + "An invalid event {event:?} was received (expected {:?})", + EventType::Truncate + ) + ); }; let Some(remote_final_lsn) = state.remote_final_lsn else { - return Err(ApplyLoopError::InvalidTransaction( - "handle_truncate_message".to_owned(), - )); + bail!( + ErrorKind::InvalidState, + "Invalid transaction", + "A transaction should have started for handle_truncate_message to be performed" + ); }; let mut rel_ids = Vec::with_capacity(message.rel_ids().len()); diff --git a/etl/src/replication/client.rs b/etl/src/replication/client.rs index d571e8037..300b2e3d5 100644 --- a/etl/src/replication/client.rs +++ b/etl/src/replication/client.rs @@ -1,3 +1,5 @@ +use crate::error::{ErrorKind, EtlError, EtlResult}; +use crate::{bail, etl_error}; use config::shared::{IntoConnectOptions, PgConnectionConfig}; use pg_escape::{quote_identifier, quote_literal}; use postgres::schema::{ColumnSchema, TableId, TableName, TableSchema}; @@ -8,7 +10,6 @@ use std::collections::HashMap; use std::fmt; use std::io::BufReader; use std::sync::Arc; -use thiserror::Error; use tokio_postgres::error::SqlState; use tokio_postgres::tls::MakeTlsConnect; use tokio_postgres::{ @@ -41,58 +42,6 @@ where tokio::spawn(task); } -/// Errors that can occur when using the PostgreSQL replication client. -#[derive(Debug, Error)] -pub enum PgReplicationError { - /// Errors from the underlying PostgreSQL client - #[error("PostgreSQL client operation failed: {0}")] - Client(#[from] tokio_postgres::Error), - - /// Errors related to TLS/SSL configuration - #[error("TLS configuration failed: {0}")] - Tls(#[from] rustls::Error), - - /// Errors related to replication slot operations - #[error("Failed to create replication slot")] - SlotCreationFailed, - - #[error("Replication slot '{0}' not found in database")] - SlotNotFound(String), - - #[error("Replication slot '{0}' already exists in database")] - SlotAlreadyExists(String), - - #[error("Invalid replication slot response: missing required fields in server response")] - SlotResponseInvalid, - - /// Errors related to database schema and objects - #[error("Table '{0}' not found in database")] - TableNotFound(TableName), - - #[error("Column '{0}' not found in table '{1}'")] - ColumnNotFound(String, String), - - #[error("Failed to parse value from column '{0}' in table '{1}': {2}")] - ColumnParsingFailed(String, String, String), - - #[error("Publication '{0}' not found in database")] - PublicationNotFound(String), - - /// Errors related to data type handling - #[error( - "Unsupported column type '{0}' (OID: {1}) in table '{2}': type is not supported for replication" - )] - UnsupportedColumnType(String, u32, String), - - #[error( - "Unsupported replica identity '{0}' in table: only 'default' or 'full' replica identities are supported" - )] - UnsupportedReplicaIdentity(String), - - #[error("Io error: {0}")] - Io(#[from] std::io::Error), -} - #[derive(Debug, Clone)] pub struct CreateSlotResult { pub consistent_point: PgLsn, @@ -135,7 +84,7 @@ impl PgReplicationSlotTransaction { /// /// The transaction is started with a repeatable read isolation level and uses the /// snapshot associated with the provided slot. - async fn new(client: PgReplicationClient) -> PgReplicationResult { + async fn new(client: PgReplicationClient) -> EtlResult { client.begin_tx().await?; Ok(Self { client }) @@ -149,7 +98,7 @@ impl PgReplicationSlotTransaction { &self, table_ids: &[TableId], publication_name: Option<&str>, - ) -> PgReplicationResult> { + ) -> EtlResult> { self.client .get_table_schemas(table_ids, publication_name) .await @@ -163,7 +112,7 @@ impl PgReplicationSlotTransaction { &self, table_id: TableId, publication: Option<&str>, - ) -> PgReplicationResult { + ) -> EtlResult { self.client.get_table_schema(table_id, publication).await } @@ -174,19 +123,19 @@ impl PgReplicationSlotTransaction { &self, table_id: TableId, column_schemas: &[ColumnSchema], - ) -> PgReplicationResult { + ) -> EtlResult { self.client .get_table_copy_stream(table_id, column_schemas) .await } /// Commits the current transaction. - pub async fn commit(self) -> PgReplicationResult<()> { + pub async fn commit(self) -> EtlResult<()> { self.client.commit_tx().await } /// Rolls back the current transaction. - pub async fn rollback(self) -> PgReplicationResult<()> { + pub async fn rollback(self) -> EtlResult<()> { self.client.rollback_tx().await } } @@ -200,15 +149,12 @@ pub struct PgReplicationClient { client: Arc, } -/// Update the type alias to use the new error type -pub type PgReplicationResult = Result; - impl PgReplicationClient { /// Establishes a connection to PostgreSQL. The connection uses TLS if configured in the - /// passed [`PgConnectionConfig`]. + /// supplied [`PgConnectionConfig`]. /// /// The connection is configured for logical replication mode - pub async fn connect(pg_connection_config: PgConnectionConfig) -> PgReplicationResult { + pub async fn connect(pg_connection_config: PgConnectionConfig) -> EtlResult { match pg_connection_config.tls.enabled { true => PgReplicationClient::connect_tls(pg_connection_config).await, false => PgReplicationClient::connect_no_tls(pg_connection_config).await, @@ -218,7 +164,7 @@ impl PgReplicationClient { /// Establishes a connection to PostgreSQL without TLS encryption. /// /// The connection is configured for logical replication mode. - async fn connect_no_tls(pg_connection_config: PgConnectionConfig) -> PgReplicationResult { + async fn connect_no_tls(pg_connection_config: PgConnectionConfig) -> EtlResult { let mut config: Config = pg_connection_config.clone().with_db(); config.replication_mode(ReplicationMode::Logical); @@ -235,7 +181,7 @@ impl PgReplicationClient { /// Establishes a TLS-encrypted connection to PostgreSQL. /// /// The connection is configured for logical replication mode - async fn connect_tls(pg_connection_config: PgConnectionConfig) -> PgReplicationResult { + async fn connect_tls(pg_connection_config: PgConnectionConfig) -> EtlResult { let mut config: Config = pg_connection_config.clone().with_db(); config.replication_mode(ReplicationMode::Logical); @@ -268,7 +214,7 @@ impl PgReplicationClient { pub async fn create_slot_with_transaction( &self, slot_name: &str, - ) -> PgReplicationResult<(PgReplicationSlotTransaction, CreateSlotResult)> { + ) -> EtlResult<(PgReplicationSlotTransaction, CreateSlotResult)> { // TODO: check if we want to consume the client and return it on commit to avoid any other // operations on a connection that has started a transaction. let transaction = PgReplicationSlotTransaction::new(self.clone()).await?; @@ -278,14 +224,14 @@ impl PgReplicationClient { } /// Creates a new logical replication slot with the specified name and no snapshot. - pub async fn create_slot(&self, slot_name: &str) -> PgReplicationResult { + pub async fn create_slot(&self, slot_name: &str) -> EtlResult { self.create_slot_internal(slot_name, false).await } /// Gets the slot by `slot_name`. /// /// Returns an error in case of failure or missing slot. - pub async fn get_slot(&self, slot_name: &str) -> PgReplicationResult { + pub async fn get_slot(&self, slot_name: &str) -> EtlResult { let query = format!( r#"select confirmed_flush_lsn from pg_replication_slots where slot_name = {};"#, quote_literal(slot_name) @@ -308,7 +254,11 @@ impl PgReplicationClient { } } - Err(PgReplicationError::SlotNotFound(slot_name.to_string())) + bail!( + ErrorKind::ReplicationSlotNotFound, + "Replication slot not found", + format!("Replication slot '{}' not found in database", slot_name) + ); } /// Gets an existing replication slot or creates a new one if it doesn't exist. @@ -320,17 +270,14 @@ impl PgReplicationClient { /// - A boolean indicating whether the slot was created (true) or already existed (false) /// - The slot result containing either the confirmed_flush_lsn (for existing slots) /// or the consistent_point (for newly created slots) - pub async fn get_or_create_slot( - &self, - slot_name: &str, - ) -> PgReplicationResult { + pub async fn get_or_create_slot(&self, slot_name: &str) -> EtlResult { match self.get_slot(slot_name).await { Ok(slot) => { info!("using existing replication slot '{}'", slot_name); Ok(GetOrCreateSlotResult::GetSlot(slot)) } - Err(PgReplicationError::SlotNotFound(_)) => { + Err(err) if err.kind() == ErrorKind::ReplicationSlotNotFound => { info!("creating new replication slot '{}'", slot_name); let create_result = self.create_slot_internal(slot_name, false).await?; @@ -344,7 +291,7 @@ impl PgReplicationClient { /// Deletes a replication slot with the specified name. /// /// Returns an error if the slot doesn't exist or if there are any issues with the deletion. - pub async fn delete_slot(&self, slot_name: &str) -> PgReplicationResult<()> { + pub async fn delete_slot(&self, slot_name: &str) -> EtlResult<()> { info!("deleting replication slot '{}'", slot_name); // Do not convert the query or the options to lowercase, see comment in `create_slot_internal`. let query = format!( @@ -365,18 +312,27 @@ impl PgReplicationClient { "attempted to delete non-existent replication slot '{}'", slot_name ); - return Err(PgReplicationError::SlotNotFound(slot_name.to_string())); + + bail!( + ErrorKind::ReplicationSlotNotFound, + "Replication slot not found", + format!( + "Replication slot '{}' not found in database while attempting its deletion", + slot_name + ) + ); } } error!("failed to delete replication slot '{}': {}", slot_name, err); + Err(err.into()) } } } /// Checks if a publication with the given name exists. - pub async fn publication_exists(&self, publication: &str) -> PgReplicationResult { + pub async fn publication_exists(&self, publication: &str) -> EtlResult { let publication_exists_query = format!( "select 1 as exists from pg_publication where pubname = {};", quote_literal(publication) @@ -386,6 +342,7 @@ impl PgReplicationClient { return Ok(true); } } + Ok(false) } @@ -393,7 +350,7 @@ impl PgReplicationClient { pub async fn get_publication_table_names( &self, publication_name: &str, - ) -> PgReplicationResult> { + ) -> EtlResult> { let publication_query = format!( "select schemaname, tablename from pg_publication_tables where pubname = {};", quote_literal(publication_name) @@ -420,7 +377,7 @@ impl PgReplicationClient { pub async fn get_publication_table_ids( &self, publication_name: &str, - ) -> PgReplicationResult> { + ) -> EtlResult> { let publication_query = format!( "select c.oid from pg_publication_tables pt join pg_class c on c.relname = pt.tablename @@ -429,16 +386,16 @@ impl PgReplicationClient { quote_literal(publication_name) ); - let mut table_oids = vec![]; + let mut table_ids = vec![]; for msg in self.client.simple_query(&publication_query).await? { if let SimpleQueryMessage::Row(row) = msg { - let oid = Self::get_row_value::(&row, "oid", "pg_class").await?; - - table_oids.push(oid); + // For the sake of simplicity, we refer to the table oid as table id. + let table_id = Self::get_row_value::(&row, "oid", "pg_class").await?; + table_ids.push(table_id); } } - Ok(table_oids) + Ok(table_ids) } /// Starts a logical replication stream from the specified publication and slot. @@ -449,7 +406,7 @@ impl PgReplicationClient { publication_name: &str, slot_name: &str, start_lsn: PgLsn, - ) -> PgReplicationResult { + ) -> EtlResult { info!( "starting logical replication from publication '{}' with slot named '{}' at lsn {}", publication_name, slot_name, start_lsn @@ -469,7 +426,6 @@ impl PgReplicationClient { ); let copy_stream = self.client.copy_both_simple::(&query).await?; - let stream = LogicalReplicationStream::new(copy_stream); Ok(stream) @@ -479,7 +435,7 @@ impl PgReplicationClient { /// /// The transaction doesn't make any assumptions about the snapshot in use, since this is a /// concern of the statements issued within the transaction. - async fn begin_tx(&self) -> PgReplicationResult<()> { + async fn begin_tx(&self) -> EtlResult<()> { self.client .simple_query("begin read only isolation level repeatable read;") .await?; @@ -488,14 +444,16 @@ impl PgReplicationClient { } /// Commits the current transaction. - async fn commit_tx(&self) -> PgReplicationResult<()> { + async fn commit_tx(&self) -> EtlResult<()> { self.client.simple_query("commit;").await?; + Ok(()) } /// Rolls back the current transaction. - async fn rollback_tx(&self) -> PgReplicationResult<()> { + async fn rollback_tx(&self) -> EtlResult<()> { self.client.simple_query("rollback;").await?; + Ok(()) } @@ -506,7 +464,7 @@ impl PgReplicationClient { &self, slot_name: &str, use_snapshot: bool, - ) -> PgReplicationResult { + ) -> EtlResult { // Do not convert the query or the options to lowercase, since the lexer for // replication commands (repl_scanner.l) in Postgres code expects the commands // in uppercase. This probably should be fixed in upstream, but for now we will @@ -540,7 +498,14 @@ impl PgReplicationClient { Err(err) => { if let Some(code) = err.code() { if *code == SqlState::DUPLICATE_OBJECT { - return Err(PgReplicationError::SlotAlreadyExists(slot_name.to_string())); + bail!( + ErrorKind::ReplicationSlotAlreadyExists, + "Replication slot already exists", + format!( + "Replication slot '{}' already exists in database", + slot_name + ) + ); } } @@ -548,7 +513,10 @@ impl PgReplicationClient { } } - Err(PgReplicationError::SlotCreationFailed) + Err(etl_error!( + ErrorKind::ReplicationSlotNotCreated, + "Failed to create replication slot" + )) } /// Retrieves schema information for multiple tables. @@ -558,7 +526,7 @@ impl PgReplicationClient { &self, table_ids: &[TableId], publication_name: Option<&str>, - ) -> PgReplicationResult> { + ) -> EtlResult> { let mut table_schemas = HashMap::new(); // TODO: consider if we want to fail when at least one table was missing or not. @@ -566,7 +534,7 @@ impl PgReplicationClient { let table_schema = self.get_table_schema(*table_id, publication_name).await?; // TODO: this warning and skipping should not happen in this method, - // but rather higher in the stack. + // but rather higher in the stack. if !table_schema.has_primary_keys() { warn!( "table {} with id {} will not be copied because it has no primary key", @@ -589,7 +557,7 @@ impl PgReplicationClient { &self, table_id: TableId, publication: Option<&str>, - ) -> PgReplicationResult { + ) -> EtlResult { let table_name = self.get_table_name(table_id).await?; let column_schemas = self.get_column_schemas(table_id, publication).await?; @@ -603,7 +571,7 @@ impl PgReplicationClient { /// Loads the table name and schema information for a given table OID. /// /// Returns a `TableName` containing both the schema and table name. - async fn get_table_name(&self, table_id: TableId) -> PgReplicationResult { + async fn get_table_name(&self, table_id: TableId) -> EtlResult { let table_info_query = format!( "select n.nspname as schema_name, c.relname as table_name from pg_class c @@ -625,10 +593,11 @@ impl PgReplicationClient { } } - Err(PgReplicationError::TableNotFound(TableName { - schema: String::new(), - name: format!("oid: {table_id}"), - })) + bail!( + ErrorKind::SourceSchemaError, + "Table not found", + format!("Table not found in database (table id: {})", table_id) + ); } /// Retrieves schema information for all columns in a table. @@ -639,7 +608,7 @@ impl PgReplicationClient { &self, table_id: TableId, publication: Option<&str>, - ) -> PgReplicationResult> { + ) -> EtlResult> { let (pub_cte, pub_pred) = if let Some(publication) = publication { ( format!( @@ -719,7 +688,7 @@ impl PgReplicationClient { &self, table_id: TableId, column_schemas: &[ColumnSchema], - ) -> PgReplicationResult { + ) -> EtlResult { let column_list = column_schemas .iter() .map(|col| quote_identifier(&col.name)) @@ -747,22 +716,27 @@ impl PgReplicationClient { row: &SimpleQueryRow, column_name: &str, table_name: &str, - ) -> PgReplicationResult + ) -> EtlResult where T::Err: fmt::Debug, { - let value = row - .try_get(column_name)? - .ok_or(PgReplicationError::ColumnNotFound( - column_name.to_string(), - table_name.to_string(), - ))?; + let value = row.try_get(column_name)?.ok_or(etl_error!( + ErrorKind::SourceSchemaError, + "Column not found", + format!( + "Column '{}' not found in table '{}'", + column_name, table_name + ) + ))?; value.parse().map_err(|e: T::Err| { - PgReplicationError::ColumnParsingFailed( - column_name.to_string(), - table_name.to_string(), - format!("{e:?}"), + etl_error!( + ErrorKind::ConversionError, + "Column parsing failed", + format!( + "Failed to parse value from column '{}' in table '{}': {:?}", + column_name, table_name, e + ) ) }) } diff --git a/etl/src/replication/common.rs b/etl/src/replication/common.rs index 5e2e6665c..27fb68c94 100644 --- a/etl/src/replication/common.rs +++ b/etl/src/replication/common.rs @@ -1,9 +1,9 @@ use postgres::schema::TableId; use std::collections::HashMap; +use crate::error::EtlResult; use crate::state::store::base::StateStore; use crate::state::table::TableReplicationPhase; -use crate::workers::apply::ApplyWorkerHookError; /// Returns the table replication states that are either done or in active state. /// @@ -12,7 +12,7 @@ use crate::workers::apply::ApplyWorkerHookError; pub async fn get_table_replication_states( state_store: &S, done: bool, -) -> Result, ApplyWorkerHookError> +) -> EtlResult> where S: StateStore + Clone + Send + Sync + 'static, { diff --git a/etl/src/replication/slot.rs b/etl/src/replication/slot.rs index 4fd9b3183..d7cf82867 100644 --- a/etl/src/replication/slot.rs +++ b/etl/src/replication/slot.rs @@ -1,5 +1,6 @@ -use thiserror::Error; - +use crate::bail; +use crate::error::EtlError; +use crate::error::{ErrorKind, EtlResult}; use crate::pipeline::PipelineId; use crate::workers::base::WorkerType; @@ -10,20 +11,8 @@ const MAX_SLOT_NAME_LENGTH: usize = 63; const APPLY_WORKER_PREFIX: &str = "supabase_etl_apply"; const TABLE_SYNC_PREFIX: &str = "supabase_etl_table_sync"; -/// Error types that can occur when working with replication slots -#[derive(Debug, Error)] -pub enum SlotError { - #[error( - "Replication slot name exceeds maximum length of {MAX_SLOT_NAME_LENGTH} characters: name must be shorter" - )] - NameTooLong, -} - /// Generates a replication slot name. -pub fn get_slot_name( - pipeline_id: PipelineId, - worker_type: WorkerType, -) -> Result { +pub fn get_slot_name(pipeline_id: PipelineId, worker_type: WorkerType) -> EtlResult { let slot_name = match worker_type { WorkerType::Apply => { format!("{APPLY_WORKER_PREFIX}_{pipeline_id}") @@ -34,7 +23,10 @@ pub fn get_slot_name( }; if slot_name.len() > MAX_SLOT_NAME_LENGTH { - return Err(SlotError::NameTooLong); + bail!( + ErrorKind::ValidationError, + "Invalid slot name length: {slot_name}" + ); } Ok(slot_name) diff --git a/etl/src/replication/stream.rs b/etl/src/replication/stream.rs index 31434e212..3f91d9bc2 100644 --- a/etl/src/replication/stream.rs +++ b/etl/src/replication/stream.rs @@ -1,4 +1,3 @@ -use crate::conversions::table_row::{TableRow, TableRowConversionError, TableRowConverter}; use futures::{Stream, ready}; use pin_project_lite::pin_project; use postgres::schema::ColumnSchema; @@ -7,28 +6,20 @@ use postgres_replication::LogicalReplicationStream; use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, Instant, SystemTimeError}; -use thiserror::Error; +use std::time::{Duration, Instant}; use tokio_postgres::CopyOutStream; use tokio_postgres::types::PgLsn; use tracing::debug; +use crate::conversions::table_row::{TableRow, TableRowConverter}; +use crate::error::EtlError; +use crate::error::{ErrorKind, EtlResult}; +use crate::etl_error; + /// The amount of milliseconds between two consecutive status updates in case no forced update /// is requested. const STATUS_UPDATE_INTERVAL: Duration = Duration::from_millis(100); -/// Errors that can occur while streaming table copy data. -#[derive(Debug, Error)] -pub enum TableCopyStreamError { - /// An error occurred when copying table data from the stream. - #[error("An error occurred when copying table data from the stream: {0}")] - TableCopyFailed(#[from] tokio_postgres::Error), - - /// An error occurred while converting a table row during table copy. - #[error("An error occurred while converting a table row during table copy: {0}")] - Conversion(#[from] TableRowConversionError), -} - pin_project! { /// A stream that yields rows from a PostgreSQL COPY operation. /// @@ -56,7 +47,7 @@ impl<'a> TableCopyStream<'a> { } impl<'a> Stream for TableCopyStream<'a> { - type Item = Result; + type Item = EtlResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); @@ -64,7 +55,7 @@ impl<'a> Stream for TableCopyStream<'a> { // TODO: allow pluggable table row conversion based on if the data is in text or binary format. Some(Ok(row)) => match TableRowConverter::try_from(&row, this.column_schemas) { Ok(row) => Poll::Ready(Some(Ok(row))), - Err(err) => Poll::Ready(Some(Err(err.into()))), + Err(err) => Poll::Ready(Some(Err(err))), }, Some(Err(err)) => Poll::Ready(Some(Err(err.into()))), None => Poll::Ready(None), @@ -72,18 +63,6 @@ impl<'a> Stream for TableCopyStream<'a> { } } -/// Errors that can occur while streaming logical replication events. -#[derive(Debug, Error)] -pub enum EventsStreamError { - /// An error occurred when copying table data from the stream. - #[error("An error occurred when copying table data from the stream: {0}")] - TableCopyFailed(#[from] tokio_postgres::Error), - - /// An error occurred while calculating the elapsed time since PostgreSQL epoch. - #[error("An error occurred while determining the elapsed time: {0}")] - EpochCalculationFailed(#[from] SystemTimeError), -} - pin_project! { pub struct EventsStream { #[pin] @@ -120,7 +99,7 @@ impl EventsStream { flush_lsn: PgLsn, apply_lsn: PgLsn, force: bool, - ) -> Result<(), EventsStreamError> { + ) -> EtlResult<()> { let this = self.project(); // If we are not forced to send an update, we can willingly do so based on a set of conditions. @@ -151,7 +130,16 @@ impl EventsStream { // The client's system clock at the time of transmission, as microseconds since midnight // on 2000-01-01. - let ts = POSTGRES_EPOCH.elapsed()?.as_micros() as i64; + let ts = POSTGRES_EPOCH + .elapsed() + .map_err(|e| { + etl_error!( + ErrorKind::InvalidState, + "Invalid Postgres epoch", + e.to_string() + ) + })? + .as_micros() as i64; this.stream .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, 0) @@ -172,7 +160,7 @@ impl EventsStream { } impl Stream for EventsStream { - type Item = Result, EventsStreamError>; + type Item = EtlResult>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); diff --git a/etl/src/replication/table_sync.rs b/etl/src/replication/table_sync.rs index f28d3d50f..8e7f6415c 100644 --- a/etl/src/replication/table_sync.rs +++ b/etl/src/replication/table_sync.rs @@ -2,55 +2,32 @@ use config::shared::PipelineConfig; #[cfg(feature = "failpoints")] use fail::fail_point; use futures::StreamExt; -use postgres::schema::{TableId, TableName}; +use postgres::schema::TableId; use std::sync::Arc; -use thiserror::Error; use tokio::pin; use tokio_postgres::types::PgLsn; -use tracing::{error, info, warn}; +use tracing::{info, warn}; +use crate::bail; use crate::concurrency::shutdown::{ShutdownResult, ShutdownRx}; use crate::concurrency::signal::SignalTx; use crate::concurrency::stream::BatchStream; -use crate::destination::base::{Destination, DestinationError}; -#[cfg(feature = "failpoints")] -use crate::failpoints::START_TABLE_SYNC_AFTER_DATA_SYNC; +use crate::destination::base::Destination; +use crate::error::{ErrorKind, EtlError, EtlResult}; use crate::pipeline::PipelineId; -use crate::replication::client::{PgReplicationClient, PgReplicationError}; -use crate::replication::slot::{SlotError, get_slot_name}; -use crate::replication::stream::{TableCopyStream, TableCopyStreamError}; +use crate::replication::client::PgReplicationClient; +use crate::replication::slot::get_slot_name; +use crate::replication::stream::TableCopyStream; use crate::schema::cache::SchemaCache; -use crate::state::store::base::{StateStore, StateStoreError}; +use crate::state::store::base::StateStore; use crate::state::table::{TableReplicationPhase, TableReplicationPhaseType}; use crate::workers::base::WorkerType; -use crate::workers::table_sync::{TableSyncWorkerState, TableSyncWorkerStateError}; - -#[derive(Debug, Error)] -pub enum TableSyncError { - #[error("Invalid replication phase '{0}': expected Init, DataSync, or FinishedCopy")] - InvalidPhase(TableReplicationPhaseType), - - #[error("Invalid replication slot name: {0}")] - InvalidSlotName(#[from] SlotError), - - #[error("PostgreSQL replication operation failed: {0}")] - PgReplication(#[from] PgReplicationError), - - #[error("An error occurred while interacting with the table sync worker state: {0}")] - TableSyncWorkerState(#[from] TableSyncWorkerStateError), - - #[error("An error occurred while writing to the destination: {0}")] - Destination(#[from] DestinationError), - - #[error("An error happened in the state store: {0}")] - StateStore(#[from] StateStoreError), - - #[error("An error happened in the table copy stream")] - TableCopyStream(#[from] TableCopyStreamError), - - #[error("table {0} has no primary key")] - MissingPrimaryKey(TableName), -} +use crate::workers::table_sync::TableSyncWorkerState; +#[cfg(feature = "failpoints")] +use crate::{ + failpoints::START_TABLE_SYNC_AFTER_DATA_SYNC_ERROR, + failpoints::START_TABLE_SYNC_AFTER_DATA_SYNC_PANIC, +}; #[derive(Debug)] pub enum TableSyncResult { @@ -71,7 +48,7 @@ pub async fn start_table_sync( destination: D, shutdown_rx: ShutdownRx, force_syncing_tables_tx: SignalTx, -) -> Result +) -> EtlResult where S: StateStore + Clone + Send + 'static, D: Destination + Clone + Send + 'static, @@ -112,7 +89,14 @@ where phase_type, table_id ); - return Err(TableSyncError::InvalidPhase(phase_type)); + bail!( + ErrorKind::InvalidState, + "Invalid replication phase", + format!( + "Invalid replication phase '{}': expected Init, DataSync, or FinishedCopy", + phase_type + ) + ); } phase_type @@ -146,8 +130,8 @@ where // before starting a table copy. if let Err(err) = replication_client.delete_slot(&slot_name).await { // If the slot is not found, we are safe to continue, for any other error, we bail. - if !matches!(err, PgReplicationError::SlotNotFound(_)) { - return Err(err.into()); + if err.kind() != ErrorKind::ReplicationSlotNotFound { + return Err(err); } } } @@ -163,7 +147,14 @@ where // Fail point to test when the table sync fails. #[cfg(feature = "failpoints")] - fail_point!(START_TABLE_SYNC_AFTER_DATA_SYNC); + fail_point!(START_TABLE_SYNC_AFTER_DATA_SYNC_PANIC); + #[cfg(feature = "failpoints")] + fail_point!(START_TABLE_SYNC_AFTER_DATA_SYNC_ERROR, |_| { + bail!( + ErrorKind::Unknown, + "An unknown error has occurred before copying the table" + ); + }); // We create the slot with a transaction, since we need to have a consistent snapshot of the database // before copying the schema and tables. @@ -190,7 +181,12 @@ where state_store .update_table_replication_state(table_id, TableReplicationPhase::Skipped) .await?; - return Err(TableSyncError::MissingPrimaryKey(table_schema.name)); + + bail!( + ErrorKind::SourceSchemaError, + "Missing primary key", + format!("table {} has no primary key", table_schema.name) + ); } schema_cache.add_table_schema(table_schema.clone()).await; diff --git a/etl/src/state/store/base.rs b/etl/src/state/store/base.rs index 5d13fc735..1c24829b3 100644 --- a/etl/src/state/store/base.rs +++ b/etl/src/state/store/base.rs @@ -1,35 +1,8 @@ use postgres::schema::TableId; use std::{collections::HashMap, future::Future}; -use thiserror::Error; -use crate::{ - replication::slot::SlotError, - state::{ - store::postgres::{FromTableStateError, ToTableStateError}, - table::TableReplicationPhase, - }, -}; - -#[derive(Debug, Error)] -pub enum StateStoreError { - #[error("Sqlx error in state store: {0}")] - Database(#[from] sqlx::Error), - - #[error("Slot error in state store: {0}")] - Slot(#[from] SlotError), - - #[error("Invalid confirmed flush lsn value in state store: {0}")] - InvalidConfirmedFlushLsn(String), - - #[error("Missing slot in state store: {0}")] - MissingSlot(String), - - #[error("Error converting from table replication phase to table state")] - ToTableState(#[from] ToTableStateError), - - #[error("Error converting from table state to table replication phase")] - FromTableState(#[from] FromTableStateError), -} +use crate::error::EtlResult; +use crate::state::table::TableReplicationPhase; /// This trait represents a state store for the replication state of all tables. /// It assumes that the implementers keep a cache of the state to avoid having @@ -41,28 +14,26 @@ pub trait StateStore { fn get_table_replication_state( &self, table_id: TableId, - ) -> impl Future, StateStoreError>> + Send; + ) -> impl Future>> + Send; /// Returns the table replication states for all the tables from the cache. /// Does not read from the persistent store. fn get_table_replication_states( &self, - ) -> impl Future, StateStoreError>> + Send; + ) -> impl Future>> + Send; /// Loads the table replication states from the persistent state into the cache. - /// This should called once at program start to load the state into the cache + /// This should be called once at program start to load the state into the cache /// and then use only the `get_X` methods to access the state. Updating the state /// by calling the `update_table_replication_state` updates in both the cache and /// the persistent store, so no need to ever load the state again. - fn load_table_replication_states( - &self, - ) -> impl Future> + Send; + fn load_table_replication_states(&self) -> impl Future> + Send; - /// Updates the table replicate state for a table with `table_id` in both the cache as well as + /// Updates the table replicate state for a table with `table_id` in both the cache and /// the persistent store. fn update_table_replication_state( &self, table_id: TableId, state: TableReplicationPhase, - ) -> impl Future> + Send; + ) -> impl Future> + Send; } diff --git a/etl/src/state/store/memory.rs b/etl/src/state/store/memory.rs index fbba0e1a9..316a8a1f4 100644 --- a/etl/src/state/store/memory.rs +++ b/etl/src/state/store/memory.rs @@ -3,7 +3,8 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; -use crate::state::store::base::{StateStore, StateStoreError}; +use crate::error::EtlResult; +use crate::state::store::base::StateStore; use crate::state::table::TableReplicationPhase; #[derive(Debug)] @@ -38,7 +39,7 @@ impl StateStore for MemoryStateStore { async fn get_table_replication_state( &self, table_id: TableId, - ) -> Result, StateStoreError> { + ) -> EtlResult> { let inner = self.inner.lock().await; Ok(inner.table_replication_states.get(&table_id).cloned()) @@ -46,13 +47,13 @@ impl StateStore for MemoryStateStore { async fn get_table_replication_states( &self, - ) -> Result, StateStoreError> { + ) -> EtlResult> { let inner = self.inner.lock().await; Ok(inner.table_replication_states.clone()) } - async fn load_table_replication_states(&self) -> Result { + async fn load_table_replication_states(&self) -> EtlResult { let inner = self.inner.lock().await; Ok(inner.table_replication_states.len()) @@ -62,7 +63,7 @@ impl StateStore for MemoryStateStore { &self, table_id: TableId, state: TableReplicationPhase, - ) -> Result<(), StateStoreError> { + ) -> EtlResult<()> { let mut inner = self.inner.lock().await; inner.table_replication_states.insert(table_id, state); diff --git a/etl/src/state/store/notify.rs b/etl/src/state/store/notify.rs index c42d79e28..299d7be2d 100644 --- a/etl/src/state/store/notify.rs +++ b/etl/src/state/store/notify.rs @@ -6,8 +6,9 @@ use tokio::{ sync::{Notify, RwLock}, }; +use crate::error::EtlResult; use crate::state::{ - store::base::{StateStore, StateStoreError}, + store::base::StateStore, table::{TableReplicationPhase, TableReplicationPhaseType}, }; @@ -105,7 +106,7 @@ impl StateStore for NotifyingStateStore { async fn get_table_replication_state( &self, table_id: TableId, - ) -> Result, StateStoreError> { + ) -> EtlResult> { let inner = self.inner.read().await; let result = Ok(inner.table_replication_states.get(&table_id).cloned()); @@ -118,7 +119,7 @@ impl StateStore for NotifyingStateStore { async fn get_table_replication_states( &self, - ) -> Result, StateStoreError> { + ) -> EtlResult> { let inner = self.inner.read().await; let result = Ok(inner.table_replication_states.clone()); @@ -129,7 +130,7 @@ impl StateStore for NotifyingStateStore { result } - async fn load_table_replication_states(&self) -> Result { + async fn load_table_replication_states(&self) -> EtlResult { let inner = self.inner.read().await; let result = Ok(inner.table_replication_states.clone()); @@ -144,7 +145,7 @@ impl StateStore for NotifyingStateStore { &self, table_id: TableId, state: TableReplicationPhase, - ) -> Result<(), StateStoreError> { + ) -> EtlResult<()> { let mut inner = self.inner.write().await; inner.table_replication_states.insert(table_id, state); inner.check_conditions().await; diff --git a/etl/src/state/store/postgres.rs b/etl/src/state/store/postgres.rs index f1fff441e..c8c387701 100644 --- a/etl/src/state/store/postgres.rs +++ b/etl/src/state/store/postgres.rs @@ -7,29 +7,20 @@ use postgres::replication::{ }; use postgres::schema::TableId; use sqlx::PgPool; -use thiserror::Error; use tokio::sync::Mutex; use tokio_postgres::types::PgLsn; use tracing::{debug, info}; -use crate::{ - pipeline::PipelineId, - state::{ - store::base::{StateStore, StateStoreError}, - table::TableReplicationPhase, - }, -}; +use crate::error::{ErrorKind, EtlError, EtlResult}; +use crate::pipeline::PipelineId; +use crate::state::store::base::StateStore; +use crate::state::table::TableReplicationPhase; +use crate::{bail, etl_error}; const NUM_POOL_CONNECTIONS: u32 = 1; -#[derive(Debug, Error)] -pub enum ToTableStateError { - #[error("In-memory table replication phase can't be saved in the state store")] - InMemoryPhase, -} - impl TryFrom for (TableReplicationState, Option) { - type Error = ToTableStateError; + type Error = EtlError; fn try_from(value: TableReplicationPhase) -> Result { Ok(match value { @@ -42,18 +33,16 @@ impl TryFrom for (TableReplicationState, Option) TableReplicationPhase::Ready => (TableReplicationState::Ready, None), TableReplicationPhase::Skipped => (TableReplicationState::Skipped, None), TableReplicationPhase::SyncWait | TableReplicationPhase::Catchup { .. } => { - return Err(ToTableStateError::InMemoryPhase); + bail!( + ErrorKind::InvalidState, + "In-memory phase error", + "In-memory table replication phase can't be saved in the state store" + ); } }) } } -#[derive(Debug, Error)] -pub enum FromTableStateError { - #[error("Lsn can't be missing from the state store if state is SyncDone")] - MissingSyncDoneLsn, -} - #[derive(Debug)] struct Inner { table_states: HashMap, @@ -69,11 +58,12 @@ pub struct PostgresStateStore { } impl PostgresStateStore { - pub fn new(pipeline_id: PipelineId, source_config: PgConnectionConfig) -> PostgresStateStore { + pub fn new(pipeline_id: PipelineId, source_config: PgConnectionConfig) -> Self { let inner = Inner { table_states: HashMap::new(), }; - PostgresStateStore { + + Self { pipeline_id, source_config, inner: Arc::new(Mutex::new(inner)), @@ -118,19 +108,30 @@ impl PostgresStateStore { &self, state: &TableReplicationState, sync_done_lsn: Option, - ) -> Result { + ) -> EtlResult { Ok(match state { TableReplicationState::Init => TableReplicationPhase::Init, TableReplicationState::DataSync => TableReplicationPhase::DataSync, TableReplicationState::FinishedCopy => TableReplicationPhase::FinishedCopy, TableReplicationState::SyncDone => match sync_done_lsn { Some(lsn_str) => { - let lsn = lsn_str - .parse::() - .map_err(|_| StateStoreError::InvalidConfirmedFlushLsn(lsn_str))?; + let lsn = lsn_str.parse::().map_err(|_| { + etl_error!( + ErrorKind::ValidationError, + "Invalid LSN", + format!( + "Invalid confirmed flush lsn value in state store: {}", + lsn_str + ) + ) + })?; TableReplicationPhase::SyncDone { lsn } } - None => return Err(FromTableStateError::MissingSyncDoneLsn)?, + None => bail!( + ErrorKind::ValidationError, + "Missing LSN", + "Lsn can't be missing from the state store if state is 'SyncDone'" + ), }, TableReplicationState::Ready => TableReplicationPhase::Ready, TableReplicationState::Skipped => TableReplicationPhase::Skipped, @@ -142,20 +143,21 @@ impl StateStore for PostgresStateStore { async fn get_table_replication_state( &self, table_id: TableId, - ) -> Result, StateStoreError> { + ) -> EtlResult> { let inner = self.inner.lock().await; Ok(inner.table_states.get(&table_id).cloned()) } async fn get_table_replication_states( &self, - ) -> Result, StateStoreError> { + ) -> EtlResult> { let inner = self.inner.lock().await; Ok(inner.table_states.clone()) } - async fn load_table_replication_states(&self) -> Result { + async fn load_table_replication_states(&self) -> EtlResult { debug!("loading table replication states from postgres state store"); + let pool = self.connect_to_source().await?; let replication_state_rows = self .get_all_replication_state_rows(&pool, self.pipeline_id) @@ -182,10 +184,11 @@ impl StateStore for PostgresStateStore { &self, table_id: TableId, state: TableReplicationPhase, - ) -> Result<(), StateStoreError> { + ) -> EtlResult<()> { let (table_state, sync_done_lsn) = state.try_into()?; self.update_replication_state(self.pipeline_id, table_id, table_state, sync_done_lsn) .await?; + let mut inner = self.inner.lock().await; inner.table_states.insert(table_id, state); diff --git a/etl/src/test_utils/test_destination_wrapper.rs b/etl/src/test_utils/test_destination_wrapper.rs index 787e948c4..d4cc00d45 100644 --- a/etl/src/test_utils/test_destination_wrapper.rs +++ b/etl/src/test_utils/test_destination_wrapper.rs @@ -7,7 +7,8 @@ use tokio::sync::{Notify, RwLock}; use crate::conversions::event::{Event, EventType}; use crate::conversions::table_row::TableRow; -use crate::destination::base::{Destination, DestinationError}; +use crate::destination::base::Destination; +use crate::error::EtlResult; use crate::schema::cache::SchemaCache; use crate::test_utils::event::check_events_count; @@ -156,7 +157,7 @@ impl TestDestinationWrapper { } impl Destination for TestDestinationWrapper { - async fn inject(&self, schema_cache: SchemaCache) -> Result<(), DestinationError> { + async fn inject(&self, schema_cache: SchemaCache) -> EtlResult<()> { let destination = { let inner = self.inner.read().await; inner.wrapped_destination.clone() @@ -165,7 +166,7 @@ impl Destination for TestDestinationWrappe destination.inject(schema_cache).await } - async fn write_table_schema(&self, table_schema: TableSchema) -> Result<(), DestinationError> { + async fn write_table_schema(&self, table_schema: TableSchema) -> EtlResult<()> { let destination = { let inner = self.inner.read().await; inner.wrapped_destination.clone() @@ -185,7 +186,7 @@ impl Destination for TestDestinationWrappe result } - async fn load_table_schemas(&self) -> Result, DestinationError> { + async fn load_table_schemas(&self) -> EtlResult> { let destination = { let inner = self.inner.read().await; inner.wrapped_destination.clone() @@ -198,7 +199,7 @@ impl Destination for TestDestinationWrappe &self, table_id: TableId, table_rows: Vec, - ) -> Result<(), DestinationError> { + ) -> EtlResult<()> { let destination = { let inner = self.inner.read().await; inner.wrapped_destination.clone() @@ -224,7 +225,7 @@ impl Destination for TestDestinationWrappe result } - async fn write_events(&self, events: Vec) -> Result<(), DestinationError> { + async fn write_events(&self, events: Vec) -> EtlResult<()> { let destination = { let inner = self.inner.read().await; inner.wrapped_destination.clone() diff --git a/etl/src/workers/apply.rs b/etl/src/workers/apply.rs index a43553cfe..d21fd76c3 100644 --- a/etl/src/workers/apply.rs +++ b/etl/src/workers/apply.rs @@ -1,7 +1,7 @@ +use crate::error::{ErrorKind, EtlError, EtlResult}; use config::shared::PipelineConfig; use postgres::schema::TableId; use std::sync::Arc; -use thiserror::Error; use tokio::sync::Semaphore; use tokio::task::JoinHandle; use tokio_postgres::types::PgLsn; @@ -10,64 +10,39 @@ use tracing::{Instrument, debug, error, info}; use crate::concurrency::shutdown::ShutdownRx; use crate::concurrency::signal::{SignalTx, create_signal}; use crate::destination::base::Destination; +use crate::etl_error; use crate::pipeline::PipelineId; -use crate::replication::apply::{ApplyLoopError, ApplyLoopHook, start_apply_loop}; -use crate::replication::client::{PgReplicationClient, PgReplicationError}; +use crate::replication::apply::{ApplyLoopHook, start_apply_loop}; +use crate::replication::client::PgReplicationClient; use crate::replication::common::get_table_replication_states; -use crate::replication::slot::{SlotError, get_slot_name}; +use crate::replication::slot::get_slot_name; use crate::schema::cache::SchemaCache; -use crate::state::store::base::{StateStore, StateStoreError}; +use crate::state::store::base::StateStore; use crate::state::table::{TableReplicationPhase, TableReplicationPhaseType}; -use crate::workers::base::{Worker, WorkerHandle, WorkerType, WorkerWaitError}; +use crate::workers::base::{Worker, WorkerHandle, WorkerType}; use crate::workers::pool::TableSyncWorkerPool; -use crate::workers::table_sync::{ - TableSyncWorker, TableSyncWorkerError, TableSyncWorkerState, TableSyncWorkerStateError, -}; - -#[derive(Debug, Error)] -pub enum ApplyWorkerError { - #[error("An error occurred while interacting with the state store: {0}")] - StateStore(#[from] StateStoreError), - - #[error("An error occurred in the apply loop: {0}")] - ApplyLoop(#[from] ApplyLoopError), - - #[error("A Postgres replication error occurred in the apply loop: {0}")] - PgReplication(#[from] PgReplicationError), - - #[error("Could not generate slot name in the apply loop: {0}")] - Slot(#[from] SlotError), -} - -#[derive(Debug, Error)] -pub enum ApplyWorkerHookError { - #[error("An error occurred while interacting with the state store: {0}")] - StateStore(#[from] StateStoreError), - - #[error("An error occurred while interacting with the table sync worker state: {0}")] - TableSyncWorkerState(#[from] TableSyncWorkerStateError), - - #[error("An error occurred while trying to start the table sync worker: {0}")] - TableSyncWorkerStartedFailed(#[from] TableSyncWorkerError), - - #[error("A Postgres replication error occurred in the apply worker: {0}")] - PgReplication(#[from] PgReplicationError), -} +use crate::workers::table_sync::{TableSyncWorker, TableSyncWorkerState}; #[derive(Debug)] pub struct ApplyWorkerHandle { - handle: Option>>, + handle: Option>>, } impl WorkerHandle<()> for ApplyWorkerHandle { fn state(&self) {} - async fn wait(mut self) -> Result<(), WorkerWaitError> { + async fn wait(mut self) -> EtlResult<()> { let Some(handle) = self.handle.take() else { return Ok(()); }; - handle.await??; + handle.await.map_err(|err| { + etl_error!( + ErrorKind::ApplyWorkerPanic, + "A panic occurred in the apply worker", + err + ) + })??; Ok(()) } @@ -118,9 +93,9 @@ where S: StateStore + Clone + Send + Sync + 'static, D: Destination + Clone + Send + Sync + 'static, { - type Error = ApplyWorkerError; + type Error = EtlError; - async fn start(self) -> Result { + async fn start(self) -> EtlResult { info!("starting apply worker"); let apply_worker_span = tracing::info_span!( @@ -174,7 +149,7 @@ where async fn get_start_lsn( pipeline_id: PipelineId, replication_client: &PgReplicationClient, -) -> Result { +) -> EtlResult { let slot_name = get_slot_name(pipeline_id, WorkerType::Apply)?; // TODO: validate that we only create the slot when we first start replication which // means when all tables are in the Init state. In any other case we should raise an @@ -253,11 +228,7 @@ where ) } - async fn handle_syncing_table( - &self, - table_id: TableId, - current_lsn: PgLsn, - ) -> Result { + async fn handle_syncing_table(&self, table_id: TableId, current_lsn: PgLsn) -> EtlResult { let mut pool = self.pool.lock().await; let table_sync_worker_state = pool.get_active_worker_state(table_id); @@ -278,7 +249,7 @@ where table_id: TableId, table_sync_worker_state: TableSyncWorkerState, current_lsn: PgLsn, - ) -> Result { + ) -> EtlResult { let mut catchup_started = false; { let mut inner = table_sync_worker_state.get_inner().lock().await; @@ -330,9 +301,7 @@ where S: StateStore + Clone + Send + Sync + 'static, D: Destination + Clone + Send + Sync + 'static, { - type Error = ApplyWorkerHookError; - - async fn before_loop(&self, _start_lsn: PgLsn) -> Result { + async fn before_loop(&self, _start_lsn: PgLsn) -> EtlResult { info!("starting table sync workers before the main apply loop"); let active_table_replication_states = @@ -370,7 +339,7 @@ where &self, current_lsn: PgLsn, update_state: bool, - ) -> Result { + ) -> EtlResult { let active_table_replication_states = get_table_replication_states(&self.state_store, false).await?; debug!( @@ -418,7 +387,7 @@ where Ok(true) } - async fn skip_table(&self, table_id: TableId) -> Result { + async fn skip_table(&self, table_id: TableId) -> EtlResult { let table_sync_worker_state = { let pool = self.pool.lock().await; pool.get_active_worker_state(table_id) @@ -443,7 +412,7 @@ where &self, table_id: TableId, remote_final_lsn: PgLsn, - ) -> Result { + ) -> EtlResult { let pool = self.pool.lock().await; // We try to load the state first from memory, if we don't find it, we try to load from the diff --git a/etl/src/workers/base.rs b/etl/src/workers/base.rs index 79892a608..f50622fdb 100644 --- a/etl/src/workers/base.rs +++ b/etl/src/workers/base.rs @@ -1,61 +1,7 @@ use postgres::schema::TableId; -use std::fmt; use std::future::Future; -use thiserror::Error; -use tokio::task; -use crate::workers::apply::ApplyWorkerError; -use crate::workers::table_sync::TableSyncWorkerError; - -/// Represents all possible errors that can occur while waiting for a worker to complete. -/// -/// This enum is used to distinguish between different failure modes that may arise during -/// the execution of asynchronous worker tasks, including panics, explicit errors, and -/// silent failures that are internally handled but still indicate abnormal termination. -#[derive(Debug, Error)] -pub enum WorkerWaitError { - /// The worker's task panicked, causing its join handle to return an error. - /// - /// This typically indicates a bug or unrecoverable condition in the worker's logic. - #[error("Worker task panicked or was forcefully aborted: {0}")] - WorkerPanicked(#[from] task::JoinError), - - /// The worker's task failed internally, but the error was caught and not propagated. - /// - /// This can occur when using abstractions like `ReactiveFuture`, which catch both panics and - /// `Result::Err` values, and then notify the pool about the failure without letting it bubble up. - #[error("Worker task failed internally and the error was silently handled: {0}")] - WorkerSilentlyFailed(String), - - /// The apply worker encountered an error that was propagated via the handle's return value. - /// - /// This variant wraps the specific error returned by the apply worker. - #[error("Apply worker terminated with an error: {0}")] - ApplyWorkerFailed(#[from] ApplyWorkerError), - - /// The table sync worker encountered an error that was propagated via the handle's return value. - /// - /// This variant wraps the specific error returned by the table sync worker. - #[error("Table sync worker terminated with an error: {0}")] - TableSyncWorkerFailed(#[from] TableSyncWorkerError), -} - -#[derive(Debug)] -pub struct WorkerWaitErrors(pub Vec); - -impl fmt::Display for WorkerWaitErrors { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if self.0.is_empty() { - write!(f, "no worker failed.") - } else { - writeln!(f, "the workers failed with the following errors:")?; - for (i, err) in self.0.iter().enumerate() { - writeln!(f, " {}: {}", i + 1, err)?; - } - Ok(()) - } - } -} +use crate::error::EtlResult; /// The type of worker that is currently running. /// @@ -99,5 +45,5 @@ pub trait WorkerHandle { /// /// The future resolves to a [`Result`] indicating whether the worker completed successfully /// or encountered an error. - fn wait(self) -> impl Future> + Send; + fn wait(self) -> impl Future> + Send; } diff --git a/etl/src/workers/pool.rs b/etl/src/workers/pool.rs index de91718be..3e0671479 100644 --- a/etl/src/workers/pool.rs +++ b/etl/src/workers/pool.rs @@ -8,11 +8,12 @@ use tracing::{debug, warn}; use crate::concurrency::future::ReactiveFutureCallback; use crate::destination::base::Destination; +use crate::error::EtlError; +use crate::error::{ErrorKind, EtlResult}; +use crate::etl_error; use crate::state::store::base::StateStore; -use crate::workers::base::{Worker, WorkerHandle, WorkerWaitError, WorkerWaitErrors}; -use crate::workers::table_sync::{ - TableSyncWorker, TableSyncWorkerError, TableSyncWorkerHandle, TableSyncWorkerState, -}; +use crate::workers::base::{Worker, WorkerHandle}; +use crate::workers::table_sync::{TableSyncWorker, TableSyncWorkerHandle, TableSyncWorkerState}; #[derive(Debug)] pub enum TableSyncWorkerInactiveReason { @@ -43,10 +44,7 @@ impl TableSyncWorkerPoolInner { } } - pub async fn start_worker( - &mut self, - worker: TableSyncWorker, - ) -> Result + pub async fn start_worker(&mut self, worker: TableSyncWorker) -> EtlResult where S: StateStore + Clone + Send + Sync + 'static, D: Destination + Clone + Send + Sync + 'static, @@ -95,7 +93,7 @@ impl TableSyncWorkerPoolInner { } } - pub async fn wait_all(&mut self) -> Result>, WorkerWaitErrors> { + pub async fn wait_all(&mut self) -> EtlResult>> { // If there are active workers, we return the notify, signaling that not all of them are // ready. // @@ -124,16 +122,20 @@ impl TableSyncWorkerPoolInner { // This should not happen since right now the `ReactiveFuture` is configured to // re-propagate the error after marking a table sync worker as finished. if let TableSyncWorkerInactiveReason::Errored(err) = finish { - errors.push(WorkerWaitError::WorkerSilentlyFailed(err)); + errors.push(etl_error!( + ErrorKind::TableSyncWorkerCaughtError, + "An error occurred in a table sync worker but was not propagated", + err + )); } } } - if errors.is_empty() { - Ok(None) - } else { - Err(WorkerWaitErrors(errors)) + if !errors.is_empty() { + return Err(errors.into()); } + + Ok(None) } } @@ -163,7 +165,7 @@ impl TableSyncWorkerPool { self.inner.clone() } - pub async fn wait_all(&self) -> Result<(), WorkerWaitErrors> { + pub async fn wait_all(&self) -> EtlResult<()> { loop { // We try first to wait for all workers to be finished, in case there are still active // workers, we get back a `Notify` which we will use to try again once new workers reported diff --git a/etl/src/workers/table_sync.rs b/etl/src/workers/table_sync.rs index f46337260..c89cb9f0f 100644 --- a/etl/src/workers/table_sync.rs +++ b/etl/src/workers/table_sync.rs @@ -2,8 +2,7 @@ use config::shared::PipelineConfig; use postgres::schema::TableId; use std::sync::Arc; use std::time::Duration; -use thiserror::Error; -use tokio::sync::{AcquireError, Mutex, MutexGuard, Notify, Semaphore}; +use tokio::sync::{Mutex, MutexGuard, Notify, Semaphore}; use tokio::task::JoinHandle; use tokio_postgres::types::PgLsn; use tracing::{Instrument, debug, error, info, warn}; @@ -12,16 +11,18 @@ use crate::concurrency::future::ReactiveFuture; use crate::concurrency::shutdown::{ShutdownResult, ShutdownRx}; use crate::concurrency::signal::SignalTx; use crate::destination::base::Destination; +use crate::error::{ErrorKind, EtlError, EtlResult}; use crate::pipeline::PipelineId; -use crate::replication::apply::{ApplyLoopError, ApplyLoopHook, start_apply_loop}; -use crate::replication::client::{PgReplicationClient, PgReplicationError}; +use crate::replication::apply::{ApplyLoopHook, start_apply_loop}; +use crate::replication::client::PgReplicationClient; use crate::replication::slot::get_slot_name; -use crate::replication::table_sync::{TableSyncError, TableSyncResult, start_table_sync}; +use crate::replication::table_sync::{TableSyncResult, start_table_sync}; use crate::schema::cache::SchemaCache; -use crate::state::store::base::{StateStore, StateStoreError}; +use crate::state::store::base::StateStore; use crate::state::table::{TableReplicationPhase, TableReplicationPhaseType}; -use crate::workers::base::{Worker, WorkerHandle, WorkerType, WorkerWaitError}; +use crate::workers::base::{Worker, WorkerHandle, WorkerType}; use crate::workers::pool::TableSyncWorkerPool; +use crate::{bail, etl_error}; /// Maximum time to wait for a phase change before trying again. const PHASE_CHANGE_REFRESH_FREQUENCY: Duration = Duration::from_millis(100); @@ -34,42 +35,6 @@ const PHASE_CHANGE_REFRESH_FREQUENCY: Duration = Duration::from_millis(100); /// of new tables. const MAX_DELETE_SLOT_WAIT: Duration = Duration::from_secs(30); -#[derive(Debug, Error)] -pub enum TableSyncWorkerError { - #[error("An error occurred while syncing a table: {0}")] - TableSync(#[from] TableSyncError), - - #[error("The replication state is missing for table {0}")] - ReplicationStateMissing(TableId), - - #[error("An error occurred while interacting with the state store: {0}")] - StateStore(#[from] StateStoreError), - - #[error("An error occurred in the apply loop: {0}")] - ApplyLoop(#[from] ApplyLoopError), - - #[error("Failed to acquire a permit to run a table sync worker")] - PermitAcquire(#[from] AcquireError), - - #[error("A Postgres replication error occurred in the table sync worker: {0}")] - PgReplication(#[from] PgReplicationError), -} - -#[derive(Debug, Error)] -pub enum TableSyncWorkerHookError { - #[error("An error occurred while updating the table sync worker state: {0}")] - TableSyncWorkerState(#[from] TableSyncWorkerStateError), - - #[error("A Postgres replication error occurred in the table sync worker: {0}")] - PgReplication(#[from] PgReplicationError), -} - -#[derive(Debug, Error)] -pub enum TableSyncWorkerStateError { - #[error("An error occurred while interacting with the state store: {0}")] - StateStore(#[from] StateStoreError), -} - #[derive(Debug)] pub struct TableSyncWorkerStateInner { table_id: TableId, @@ -98,7 +63,7 @@ impl TableSyncWorkerStateInner { &mut self, phase: TableReplicationPhase, state_store: S, - ) -> Result<(), TableSyncWorkerStateError> { + ) -> EtlResult<()> { self.set_phase(phase); // If we should store this phase change, we want to do it via the supplied state store. @@ -224,7 +189,7 @@ impl TableSyncWorkerState { #[derive(Debug)] pub struct TableSyncWorkerHandle { state: TableSyncWorkerState, - handle: Option>>, + handle: Option>>, } impl WorkerHandle for TableSyncWorkerHandle { @@ -232,12 +197,18 @@ impl WorkerHandle for TableSyncWorkerHandle { self.state.clone() } - async fn wait(mut self) -> Result<(), WorkerWaitError> { + async fn wait(mut self) -> EtlResult<()> { let Some(handle) = self.handle.take() else { return Ok(()); }; - handle.await??; + handle.await.map_err(|err| { + etl_error!( + ErrorKind::TableSyncWorkerPanic, + "A panic occurred in the table sync worker", + err + ) + })??; Ok(()) } @@ -295,9 +266,9 @@ where S: StateStore + Clone + Send + Sync + 'static, D: Destination + Clone + Send + Sync + 'static, { - type Error = TableSyncWorkerError; + type Error = EtlError; - async fn start(mut self) -> Result { + async fn start(mut self) -> EtlResult { info!("starting table sync worker for table {}", self.table_id); let Some(table_replication_phase) = self @@ -310,7 +281,14 @@ where self.table_id ); - return Err(TableSyncWorkerError::ReplicationStateMissing(self.table_id)); + bail!( + ErrorKind::InvalidState, + "Replication state missing", + format!( + "The replication state is missing for table {}", + self.table_id + ) + ); }; info!( @@ -381,7 +359,7 @@ where } Err(err) => { error!("table sync failed for table {}: {}", self.table_id, err); - return Err(err.into()); + return Err(err); } }; @@ -432,7 +410,8 @@ where // We spawn the table sync worker with a safe future, so that we can have controlled teardown // on completion or error. - let fut = ReactiveFuture::new(table_sync_worker, self.table_id, self.pool.get_inner()) + // TODO: we want to implement a custom callback source which can skip tables and react to panics. + let fut = ReactiveFuture::wrap(table_sync_worker, self.table_id, self.pool.get_inner()) .instrument(table_sync_worker_span); let handle = tokio::spawn(fut); @@ -472,11 +451,7 @@ where /// /// Returns `Ok(false)` when the worker is done with its work, signaling the caller that the apply /// loop should be stopped. - async fn try_advance_phase( - &self, - current_lsn: PgLsn, - update_state: bool, - ) -> Result { + async fn try_advance_phase(&self, current_lsn: PgLsn, update_state: bool) -> EtlResult { let mut inner = self.table_sync_worker_state.get_inner().lock().await; // If we caught up with the lsn, we mark this table as `SyncDone` and stop the worker. @@ -512,9 +487,7 @@ impl ApplyLoopHook for TableSyncWorkerHook where S: StateStore + Clone + Send + Sync + 'static, { - type Error = TableSyncWorkerHookError; - - async fn before_loop(&self, start_lsn: PgLsn) -> Result { + async fn before_loop(&self, start_lsn: PgLsn) -> EtlResult { info!("checking if the table sync worker is already caught up with the apply worker"); self.try_advance_phase(start_lsn, true).await @@ -531,7 +504,7 @@ where &self, current_lsn: PgLsn, update_state: bool, - ) -> Result { + ) -> EtlResult { info!( "processing syncing tables for table sync worker with lsn {}", current_lsn @@ -540,7 +513,7 @@ where self.try_advance_phase(current_lsn, update_state).await } - async fn skip_table(&self, table_id: TableId) -> Result { + async fn skip_table(&self, table_id: TableId) -> EtlResult { if self.table_id != table_id { return Ok(true); } @@ -557,7 +530,7 @@ where &self, table_id: TableId, _remote_final_lsn: PgLsn, - ) -> Result { + ) -> EtlResult { let inner = self.table_sync_worker_state.get_inner().lock().await; let is_skipped = matches!( inner.table_replication_phase.as_type(), diff --git a/etl/tests/failpoints/pipeline_test.rs b/etl/tests/failpoints/pipeline_test.rs index b87ba7213..0640fb965 100644 --- a/etl/tests/failpoints/pipeline_test.rs +++ b/etl/tests/failpoints/pipeline_test.rs @@ -1,9 +1,11 @@ use etl::destination::memory::MemoryDestination; -use etl::failpoints::START_TABLE_SYNC_AFTER_DATA_SYNC; -use etl::pipeline::{PipelineError, PipelineId}; +use etl::error::ErrorKind; +use etl::failpoints::{ + START_TABLE_SYNC_AFTER_DATA_SYNC_ERROR, START_TABLE_SYNC_AFTER_DATA_SYNC_PANIC, +}; +use etl::pipeline::PipelineId; use etl::state::store::notify::NotifyingStateStore; use etl::state::table::TableReplicationPhaseType; -use etl::workers::base::WorkerWaitError; use fail::FailScenario; use rand::random; use telemetry::init_test_tracing; @@ -11,17 +13,14 @@ use telemetry::init_test_tracing; use etl::test_utils::database::spawn_database; use etl::test_utils::pipeline::create_pipeline; use etl::test_utils::test_destination_wrapper::TestDestinationWrapper; -use etl::test_utils::test_schema::{TableSelection, setup_test_database_schema}; +use etl::test_utils::test_schema::{TableSelection, insert_mock_data, setup_test_database_schema}; -/* -Tests that we want to add: -TBD - */ +// TODO: add more tests with fault injection. #[tokio::test(flavor = "multi_thread")] async fn pipeline_handles_table_sync_worker_panic_during_data_sync() { let _scenario = FailScenario::setup(); - fail::cfg(START_TABLE_SYNC_AFTER_DATA_SYNC, "panic").unwrap(); + fail::cfg(START_TABLE_SYNC_AFTER_DATA_SYNC_PANIC, "panic").unwrap(); init_test_tracing(); @@ -61,25 +60,16 @@ async fn pipeline_handles_table_sync_worker_panic_during_data_sync() { orders_state_notify.notified().await; // We stop and inspect errors. - match pipeline.shutdown_and_wait().await.err().unwrap() { - PipelineError::OneOrMoreWorkersFailed(err) => { - assert!(matches!( - err.0.as_slice(), - [ - WorkerWaitError::WorkerPanicked(_), - WorkerWaitError::WorkerPanicked(_) - ] - )); - } - other => panic!("Expected TableSyncWorkersFailed error, but got: {other:?}"), - } + let err = pipeline.shutdown_and_wait().await.err().unwrap(); + assert_eq!(err.kinds().len(), 2); + assert_eq!(err.kinds()[0], ErrorKind::TableSyncWorkerPanic); + assert_eq!(err.kinds()[1], ErrorKind::TableSyncWorkerPanic); } -// TODO: inject the failure via fail-rs. -#[ignore] #[tokio::test(flavor = "multi_thread")] -async fn pipeline_handles_table_sync_worker_error() { +async fn pipeline_handles_table_sync_worker_error_during_data_sync() { let _scenario = FailScenario::setup(); + fail::cfg(START_TABLE_SYNC_AFTER_DATA_SYNC_ERROR, "return").unwrap(); init_test_tracing(); @@ -99,7 +89,7 @@ async fn pipeline_handles_table_sync_worker_error() { destination.clone(), ); - // Register notifications for when table sync is started. + // We register the interest in waiting for both table syncs to have started. let users_state_notify = state_store .notify_on_table_state( database_schema.users_schema().id, @@ -119,31 +109,33 @@ async fn pipeline_handles_table_sync_worker_error() { orders_state_notify.notified().await; // We stop and inspect errors. - match pipeline.shutdown_and_wait().await.err().unwrap() { - PipelineError::OneOrMoreWorkersFailed(err) => { - assert!(matches!( - err.0.as_slice(), - [ - WorkerWaitError::WorkerPanicked(_), - WorkerWaitError::WorkerPanicked(_) - ] - )); - } - other => panic!("Expected TableSyncWorkersFailed error, but got: {other:?}"), - } + let err = pipeline.shutdown_and_wait().await.err().unwrap(); + assert_eq!(err.kinds().len(), 2); + assert_eq!(err.kinds()[0], ErrorKind::Unknown); + assert_eq!(err.kinds()[1], ErrorKind::Unknown); } -// TODO: inject the failure via fail-rs. -#[ignore] #[tokio::test(flavor = "multi_thread")] -async fn table_schema_copy_retries_after_data_sync_failure() { +async fn table_copy_is_consistent_after_data_sync_threw_an_error() { let _scenario = FailScenario::setup(); + fail::cfg(START_TABLE_SYNC_AFTER_DATA_SYNC_ERROR, "return").unwrap(); init_test_tracing(); - let database = spawn_database().await; + let mut database = spawn_database().await; let database_schema = setup_test_database_schema(&database, TableSelection::Both).await; + // Insert initial test data. + let rows_inserted = 10; + insert_mock_data( + &mut database, + &database_schema.users_schema().name, + &database_schema.orders_schema().name, + 1..=rows_inserted, + false, + ) + .await; + let state_store = NotifyingStateStore::new(); let destination = TestDestinationWrapper::wrap(MemoryDestination::new()); @@ -176,11 +168,11 @@ async fn table_schema_copy_retries_after_data_sync_failure() { users_state_notify.notified().await; orders_state_notify.notified().await; - // This result could be an error or not based on if we manage to shut down before the error is - // thrown. This is a shortcoming of this fault injection implementation, we have plans to fix - // this in future PRs. - // TODO: assert error once better failure injection is implemented. - let _ = pipeline.shutdown_and_wait().await; + let err = pipeline.shutdown_and_wait().await.err().unwrap(); + assert_eq!(err.kinds().len(), 2); + + // We disable the failpoint. + fail::remove(START_TABLE_SYNC_AFTER_DATA_SYNC_ERROR); // Restart pipeline with normal state store to verify recovery. let mut pipeline = create_pipeline( @@ -198,13 +190,13 @@ async fn table_schema_copy_retries_after_data_sync_failure() { let users_state_notify = state_store .notify_on_table_state( database_schema.users_schema().id, - TableReplicationPhaseType::FinishedCopy, + TableReplicationPhaseType::SyncDone, ) .await; let orders_state_notify = state_store .notify_on_table_state( database_schema.orders_schema().id, - TableReplicationPhaseType::FinishedCopy, + TableReplicationPhaseType::SyncDone, ) .await; @@ -216,23 +208,12 @@ async fn table_schema_copy_retries_after_data_sync_failure() { pipeline.shutdown_and_wait().await.unwrap(); - // Verify table replication states. - let table_replication_states = state_store.get_table_replication_states().await; - assert_eq!(table_replication_states.len(), 2); - assert_eq!( - table_replication_states - .get(&database_schema.users_schema().id) - .unwrap() - .as_type(), - TableReplicationPhaseType::FinishedCopy - ); - assert_eq!( - table_replication_states - .get(&database_schema.orders_schema().id) - .unwrap() - .as_type(), - TableReplicationPhaseType::FinishedCopy - ); + // Verify copied data. + let table_rows = destination.get_table_rows().await; + let users_table_rows = table_rows.get(&database_schema.users_schema().id).unwrap(); + let orders_table_rows = table_rows.get(&database_schema.orders_schema().id).unwrap(); + assert_eq!(users_table_rows.len(), rows_inserted); + assert_eq!(orders_table_rows.len(), rows_inserted); // Verify table schemas were correctly stored. let table_schemas = destination.get_table_schemas().await; diff --git a/etl/tests/integration/replication_test.rs b/etl/tests/integration/replication_test.rs index 33e74e1ab..281c0846a 100644 --- a/etl/tests/integration/replication_test.rs +++ b/etl/tests/integration/replication_test.rs @@ -1,4 +1,5 @@ -use etl::replication::client::{PgReplicationClient, PgReplicationError}; +use etl::error::ErrorKind; +use etl::replication::client::PgReplicationClient; use futures::StreamExt; use postgres::schema::ColumnSchema; use postgres::tokio::test_utils::{TableModification, id_column_schema}; @@ -120,7 +121,7 @@ async fn test_create_and_delete_slot() { // Verify the slot no longer exists let result = client.get_slot(&slot_name).await; - assert!(matches!(result, Err(PgReplicationError::SlotNotFound(_)))); + assert!(matches!(result, Err(ref err) if err.kind() == ErrorKind::ReplicationSlotNotFound)); } #[tokio::test(flavor = "multi_thread")] @@ -136,7 +137,7 @@ async fn test_delete_nonexistent_slot() { // Attempt to delete a slot that doesn't exist let result = client.delete_slot(&slot_name).await; - assert!(matches!(result, Err(PgReplicationError::SlotNotFound(_)))); + assert!(matches!(result, Err(ref err) if err.kind() == ErrorKind::ReplicationSlotNotFound)); } #[tokio::test(flavor = "multi_thread")] @@ -152,7 +153,7 @@ async fn test_replication_client_doesnt_recreate_slot() { assert!(client.create_slot(&slot_name).await.is_ok()); assert!(matches!( client.create_slot(&slot_name).await, - Err(PgReplicationError::SlotAlreadyExists(_)) + Err(ref err) if err.kind() == ErrorKind::ReplicationSlotAlreadyExists )); }