Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions etl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ required-features = ["failpoints", "test-utils"]
config = { workspace = true }
postgres = { workspace = true, features = ["tokio", "replication"] }

async-trait = { workspace = true }
bigdecimal = { workspace = true, features = ["std"] }
bytes = { workspace = true }
byteorder = { workspace = true }
Expand All @@ -41,17 +40,14 @@ gcp-bigquery-client = { workspace = true, optional = true, features = [
] }
pg_escape = { workspace = true }
pin-project-lite = { workspace = true }
postgres-protocol = { workspace = true }
postgres-replication = { workspace = true }
prost = { workspace = true, optional = true }
rustls = { workspace = true, features = ["aws-lc-rs", "logging"] }
rustls-pemfile = { workspace = true, features = ["std"] }
rand = { workspace = true, features = ["std"] }
secrecy = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, features = ["std"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",
Expand Down
29 changes: 12 additions & 17 deletions etl/benches/table_copies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use clap::{Parser, Subcommand, ValueEnum};
use config::shared::{BatchConfig, PgConnectionConfig, PipelineConfig, RetryConfig, TlsConfig};
use etl::{
conversions::{event::Event, table_row::TableRow},
destination::base::{Destination, DestinationError},
destination::base::Destination,
pipeline::Pipeline,
state::{store::notify::NotifyingStateStore, table::TableReplicationPhaseType},
};
Expand All @@ -45,6 +45,7 @@ use sqlx::postgres::PgPool;

#[cfg(feature = "bigquery")]
use etl::destination::bigquery::BigQueryDestination;
use etl::error::EtlResult;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -435,26 +436,23 @@ enum BenchDestination {
}

impl Destination for BenchDestination {
async fn inject(
&self,
schema_cache: etl::schema::cache::SchemaCache,
) -> Result<(), DestinationError> {
async fn inject(&self, schema_cache: etl::schema::cache::SchemaCache) -> EtlResult<()> {
match self {
BenchDestination::Null(dest) => dest.inject(schema_cache).await,
#[cfg(feature = "bigquery")]
BenchDestination::BigQuery(dest) => dest.inject(schema_cache).await,
}
}

async fn write_table_schema(&self, table_schema: TableSchema) -> Result<(), DestinationError> {
async fn write_table_schema(&self, table_schema: TableSchema) -> EtlResult<()> {
match self {
BenchDestination::Null(dest) => dest.write_table_schema(table_schema).await,
#[cfg(feature = "bigquery")]
BenchDestination::BigQuery(dest) => dest.write_table_schema(table_schema).await,
}
}

async fn load_table_schemas(&self) -> Result<Vec<TableSchema>, DestinationError> {
async fn load_table_schemas(&self) -> EtlResult<Vec<TableSchema>> {
match self {
BenchDestination::Null(dest) => dest.load_table_schemas().await,
#[cfg(feature = "bigquery")]
Expand All @@ -466,15 +464,15 @@ impl Destination for BenchDestination {
&self,
table_id: TableId,
table_rows: Vec<TableRow>,
) -> Result<(), DestinationError> {
) -> EtlResult<()> {
match self {
BenchDestination::Null(dest) => dest.write_table_rows(table_id, table_rows).await,
#[cfg(feature = "bigquery")]
BenchDestination::BigQuery(dest) => dest.write_table_rows(table_id, table_rows).await,
}
}

async fn write_events(&self, events: Vec<Event>) -> Result<(), DestinationError> {
async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
match self {
BenchDestination::Null(dest) => dest.write_events(events).await,
#[cfg(feature = "bigquery")]
Expand All @@ -484,30 +482,27 @@ impl Destination for BenchDestination {
}

impl Destination for NullDestination {
async fn inject(
&self,
_schema_cache: etl::schema::cache::SchemaCache,
) -> Result<(), DestinationError> {
async fn inject(&self, _schema_cache: etl::schema::cache::SchemaCache) -> EtlResult<()> {
Ok(())
}

async fn write_table_schema(&self, _table_schema: TableSchema) -> Result<(), DestinationError> {
async fn write_table_schema(&self, _table_schema: TableSchema) -> EtlResult<()> {
Ok(())
}

async fn load_table_schemas(&self) -> Result<Vec<TableSchema>, DestinationError> {
async fn load_table_schemas(&self) -> EtlResult<Vec<TableSchema>> {
Ok(vec![])
}

async fn write_table_rows(
&self,
_table_id: TableId,
_table_rows: Vec<TableRow>,
) -> Result<(), DestinationError> {
) -> EtlResult<()> {
Ok(())
}

async fn write_events(&self, _events: Vec<Event>) -> Result<(), DestinationError> {
async fn write_events(&self, _events: Vec<Event>) -> EtlResult<()> {
Ok(())
}
}
60 changes: 10 additions & 50 deletions etl/src/clients/bigquery.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::EtlResult;
use futures::StreamExt;
use gcp_bigquery_client::google::cloud::bigquery::storage::v1::RowError;
use gcp_bigquery_client::storage::{ColumnMode, StorageApi};
use gcp_bigquery_client::yup_oauth2::parse_service_account_key;
use gcp_bigquery_client::{
Expand All @@ -10,7 +10,6 @@ use gcp_bigquery_client::{
};
use postgres::schema::ColumnSchema;
use std::fmt;
use thiserror::Error;
use tokio_postgres::types::Type;
use tracing::info;

Expand Down Expand Up @@ -52,32 +51,6 @@ impl fmt::Display for BigQueryOperationType {
}
}

/// Collection of row errors returned from BigQuery streaming operations.
#[derive(Debug, Error)]
pub struct RowErrors(pub Vec<RowError>);

impl fmt::Display for RowErrors {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if !self.0.is_empty() {
for row_error in self.0.iter() {
writeln!(f, "{row_error:?}")?;
}
}

Ok(())
}
}

/// Errors that can occur when interacting with BigQuery.
#[derive(Debug, Error)]
pub enum BigQueryClientError {
#[error("An error occurred with BigQuery: {0}")]
BigQuery(#[from] BQError),

#[error("One or multiple errors: {0}")]
AppendRowErrors(#[from] RowErrors),
}

/// A client for interacting with Google BigQuery.
///
/// This client provides methods for managing tables, inserting data,
Expand All @@ -95,7 +68,7 @@ impl BigQueryClient {
pub async fn new_with_key_path(
project_id: String,
sa_key_path: &str,
) -> Result<BigQueryClient, BigQueryClientError> {
) -> EtlResult<BigQueryClient> {
let client = Client::from_service_account_key_file(sa_key_path).await?;

Ok(BigQueryClient { project_id, client })
Expand All @@ -105,10 +78,7 @@ impl BigQueryClient {
///
/// Parses the provided service account key string to authenticate with the
/// BigQuery API.
pub async fn new_with_key(
project_id: String,
sa_key: &str,
) -> Result<BigQueryClient, BigQueryClientError> {
pub async fn new_with_key(project_id: String, sa_key: &str) -> EtlResult<BigQueryClient> {
let sa_key = parse_service_account_key(sa_key).map_err(BQError::from)?;
let client = Client::from_service_account_key(sa_key, false).await?;

Expand All @@ -130,7 +100,7 @@ impl BigQueryClient {
table_id: &str,
column_schemas: &[ColumnSchema],
max_staleness_mins: Option<u16>,
) -> Result<bool, BigQueryClientError> {
) -> EtlResult<bool> {
if self.table_exists(dataset_id, table_id).await? {
return Ok(false);
}
Expand All @@ -148,7 +118,7 @@ impl BigQueryClient {
table_id: &str,
column_schemas: &[ColumnSchema],
max_staleness_mins: Option<u16>,
) -> Result<(), BigQueryClientError> {
) -> EtlResult<()> {
let full_table_name = self.full_table_name(dataset_id, table_id);

let columns_spec = Self::create_columns_spec(column_schemas);
Expand All @@ -168,11 +138,7 @@ impl BigQueryClient {
}

/// Truncates a table in a BigQuery dataset.
pub async fn truncate_table(
&self,
dataset_id: &str,
table_id: &str,
) -> Result<(), BigQueryClientError> {
pub async fn truncate_table(&self, dataset_id: &str, table_id: &str) -> EtlResult<()> {
let full_table_name = self.full_table_name(dataset_id, table_id);

info!("Truncating table {full_table_name} in BigQuery");
Expand All @@ -189,11 +155,7 @@ impl BigQueryClient {
/// # Panics
///
/// Panics if the query result does not contain the expected `table_exists` column.
pub async fn table_exists(
&self,
dataset_id: &str,
table_id: &str,
) -> Result<bool, BigQueryClientError> {
pub async fn table_exists(&self, dataset_id: &str, table_id: &str) -> EtlResult<bool> {
let table = self
.client
.table()
Expand All @@ -216,7 +178,7 @@ impl BigQueryClient {
table_id: String,
table_descriptor: &TableDescriptor,
table_rows: Vec<TableRow>,
) -> Result<(), BigQueryClientError> {
) -> EtlResult<()> {
// We create a slice on table rows, which will be updated while the streaming progresses.
//
// Using a slice allows us to deallocate the vector only at the end of streaming, which leads
Expand All @@ -242,9 +204,7 @@ impl BigQueryClient {
if let Some(append_rows_response) = append_rows_stream.next().await {
let append_rows_response = append_rows_response.map_err(BQError::from)?;
if !append_rows_response.row_errors.is_empty() {
return Err(BigQueryClientError::AppendRowErrors(RowErrors(
append_rows_response.row_errors,
)));
return Err(append_rows_response.row_errors.into());
}
}

Expand All @@ -258,7 +218,7 @@ impl BigQueryClient {
}

/// Executes an SQL query and returns the result set.
pub async fn query(&self, request: QueryRequest) -> Result<ResultSet, BigQueryClientError> {
pub async fn query(&self, request: QueryRequest) -> EtlResult<ResultSet> {
let query_response = self.client.job().query(&self.project_id, request).await?;

Ok(ResultSet::new_from_query_response(query_response))
Expand Down
12 changes: 6 additions & 6 deletions etl/src/concurrency/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ where
///
/// The callback will be notified of either success or failure, and will receive the provided
/// identifier to track which future completed.
pub fn new(future: Fut, id: I, callback_source: Arc<Mutex<C>>) -> Self {
pub fn wrap(future: Fut, id: I, callback_source: Arc<Mutex<C>>) -> Self {
Self {
future: AssertUnwindSafe(future).catch_unwind(),
callback_fut: None,
Expand Down Expand Up @@ -271,7 +271,7 @@ mod tests {
let callback = Arc::new(Mutex::new(MockCallback::default()));
let table_id = 1;

let future = ReactiveFuture::new(ImmediateFuture, table_id, callback.clone());
let future = ReactiveFuture::wrap(ImmediateFuture, table_id, callback.clone());
future.await.unwrap();

let guard = callback.lock().await;
Expand All @@ -285,7 +285,7 @@ mod tests {
let table_id = 1;

// Test string panic
let future = AssertUnwindSafe(ReactiveFuture::new(
let future = AssertUnwindSafe(ReactiveFuture::wrap(
PanicFuture { panic_any: false },
table_id,
callback.clone(),
Expand All @@ -312,7 +312,7 @@ mod tests {
drop(guard);

// Test any panic
let future = AssertUnwindSafe(ReactiveFuture::new(
let future = AssertUnwindSafe(ReactiveFuture::wrap(
PanicFuture { panic_any: true },
table_id,
callback.clone(),
Expand All @@ -335,7 +335,7 @@ mod tests {
let table_id = 1;

let pending_future = PendingFuture::new();
let future = ReactiveFuture::new(pending_future, table_id, callback.clone());
let future = ReactiveFuture::wrap(pending_future, table_id, callback.clone());

// First poll should return pending
let mut pinned_future = Box::pin(future);
Expand Down Expand Up @@ -364,7 +364,7 @@ mod tests {
let callback = Arc::new(Mutex::new(MockCallback::default()));
let table_id = 1;

let future = ReactiveFuture::new(ErrorFuture, table_id, callback.clone());
let future = ReactiveFuture::wrap(ErrorFuture, table_id, callback.clone());
let result = future.await;

// Verify the error was propagated
Expand Down
18 changes: 9 additions & 9 deletions etl/src/conversions/bool.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use thiserror::Error;
use crate::bail;
use crate::error::EtlResult;
use crate::error::{ErrorKind, EtlError};

#[derive(Debug, Error)]
pub enum ParseBoolError {
#[error("invalid input value: {0}")]
InvalidInput(String),
}

pub fn parse_bool(s: &str) -> Result<bool, ParseBoolError> {
pub fn parse_bool(s: &str) -> EtlResult<bool> {
if s == "t" {
Ok(true)
} else if s == "f" {
Ok(false)
} else {
Err(ParseBoolError::InvalidInput(s.to_string()))
bail!(
ErrorKind::InvalidData,
"Invalid boolean value",
format!("Boolean value must be 't' or 'f' (received: {s})")
);
}
}
Loading