From 9bf2a4542220887499cf422b2bfc4b7e55bf5e5f Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 11 Aug 2025 13:06:57 +0200 Subject: [PATCH 1/7] feat(bigquery): Implement truncate with views --- etl-destinations/src/bigquery/client.rs | 43 ++++ etl-destinations/src/bigquery/core.rs | 261 ++++++++++++++++++++++-- 2 files changed, 284 insertions(+), 20 deletions(-) diff --git a/etl-destinations/src/bigquery/client.rs b/etl-destinations/src/bigquery/client.rs index 55bbfe0d8..7f3c565f6 100644 --- a/etl-destinations/src/bigquery/client.rs +++ b/etl-destinations/src/bigquery/client.rs @@ -62,6 +62,7 @@ impl fmt::Display for BigQueryOperationType { /// /// Provides methods for table management, data insertion, and query execution /// against BigQuery datasets with authentication and error handling. +#[derive(Clone)] pub struct BigQueryClient { project_id: BigQueryProjectId, client: Client, @@ -179,6 +180,48 @@ impl BigQueryClient { Ok(()) } + /// Creates or replaces a view that points to the specified versioned table. + /// + /// This is used during truncation operations to redirect the view to a new table version. + pub async fn create_or_replace_view( + &self, + dataset_id: &BigQueryDatasetId, + view_name: &BigQueryTableId, + target_table_id: &BigQueryTableId, + ) -> EtlResult<()> { + let full_view_name = self.full_table_name(dataset_id, view_name); + let full_target_table_name = self.full_table_name(dataset_id, target_table_id); + + info!("Creating/replacing view {full_view_name} pointing to {full_target_table_name}"); + + let query = format!( + "CREATE OR REPLACE VIEW {full_view_name} AS SELECT * FROM {full_target_table_name}" + ); + + let _ = self.query(QueryRequest::new(query)).await?; + + Ok(()) + } + + /// Drops a table from BigQuery. + /// + /// Executes a DROP TABLE statement to remove the table and all its data. + pub async fn drop_table( + &self, + dataset_id: &BigQueryDatasetId, + table_id: &BigQueryTableId, + ) -> EtlResult<()> { + let full_table_name = self.full_table_name(dataset_id, table_id); + + info!("Dropping table {full_table_name} from BigQuery"); + + let query = format!("DROP TABLE IF EXISTS {full_table_name}"); + + let _ = self.query(QueryRequest::new(query)).await?; + + Ok(()) + } + /// Checks whether a table exists in the BigQuery dataset. /// /// Returns `true` if the table exists, `false` otherwise. diff --git a/etl-destinations/src/bigquery/core.rs b/etl-destinations/src/bigquery/core.rs index 71b5b7c5a..ead9c6006 100644 --- a/etl-destinations/src/bigquery/core.rs +++ b/etl-destinations/src/bigquery/core.rs @@ -77,6 +77,14 @@ struct Inner { /// Cache of table IDs that have been successfully created or verified to exist. /// This avoids redundant `create_table_if_missing` calls for known tables. created_tables: HashSet, + /// Tracks the current version number for each table to support truncation. + /// Maps base table name to its current version number (e.g., "schema_table" -> 2). + // TODO: add mapping in the store. + table_versions: HashMap, + /// Cache of views that have been created and the versioned table they point to. + /// This avoids redundant `CREATE OR REPLACE VIEW` calls for views that already point to the correct table. + /// Maps view name to the versioned table it currently points to. + created_views: HashMap, } /// A BigQuery destination that implements the ETL [`Destination`] trait. @@ -110,6 +118,8 @@ where max_staleness_mins, schema_store, created_tables: HashSet::new(), + table_versions: HashMap::new(), + created_views: HashMap::new(), }; Ok(Self { @@ -135,6 +145,8 @@ where max_staleness_mins, schema_store, created_tables: HashSet::new(), + table_versions: HashMap::new(), + created_views: HashMap::new(), }; Ok(Self { @@ -168,34 +180,40 @@ where ) })?; - let table_id = table_name_to_bigquery_table_id(&table_schema.name); + let base_table_id = table_name_to_bigquery_table_id(&table_schema.name); + let versioned_table_id = Self::get_versioned_table_name(inner, &base_table_id); - // Optimistically skip table creation if we've already seen this table - if !inner.created_tables.contains(&table_id) { + // Optimistically skip table creation if we've already seen this versioned table + if !inner.created_tables.contains(&versioned_table_id) { inner .client .create_table_if_missing( &inner.dataset_id, - &table_id, + &versioned_table_id, &table_schema.column_schemas, inner.max_staleness_mins, ) .await?; - // Add the table to the cache (with random eviction if needed) - Self::add_to_created_tables_cache(inner, table_id.clone()); + // Add the versioned table to the cache + Self::add_to_created_tables_cache(inner, versioned_table_id.clone()); - debug!("table {table_id} added to creation cache"); + debug!("versioned table {versioned_table_id} added to creation cache"); } else { - debug!("table {table_id} found in creation cache, skipping existence check"); + debug!( + "versioned table {versioned_table_id} found in creation cache, skipping existence check" + ); } + // Ensure view points to this versioned table (uses cache to avoid redundant operations) + Self::ensure_view_points_to_table(inner, &base_table_id, &versioned_table_id).await?; + let table_descriptor = BigQueryClient::column_schemas_to_table_descriptor( &table_schema.column_schemas, use_cdc_sequence_column, ); - Ok((table_id, table_descriptor)) + Ok((versioned_table_id, table_descriptor)) } /// Adds a table ID to the created tables cache if not present. @@ -217,6 +235,87 @@ where inner.created_tables.remove(table_id); } + /// Returns the versioned table name for the current version of a base table. + /// + /// If no version exists for the table, initializes it to version 0. + fn get_versioned_table_name( + inner: &mut Inner, + base_table_id: &BigQueryTableId, + ) -> BigQueryTableId { + let version = inner + .table_versions + .entry(base_table_id.clone()) + .or_insert(0); + format!("{}_{}", base_table_id, version) + } + + /// Returns the previous versioned table name for cleanup purposes. + fn get_previous_versioned_table_name( + inner: &Inner, + base_table_id: &BigQueryTableId, + ) -> Option { + inner + .table_versions + .get(base_table_id) + .and_then(|&version| { + if version > 0 { + Some(format!("{}_{}", base_table_id, version - 1)) + } else { + None + } + }) + } + + /// Increments the version for a table and returns the new versioned table name. + fn increment_table_version( + inner: &mut Inner, + base_table_id: &BigQueryTableId, + ) -> BigQueryTableId { + let version = inner + .table_versions + .entry(base_table_id.clone()) + .or_insert(0); + *version += 1; + format!("{}_{}", base_table_id, version) + } + + /// Checks if a view needs to be created or updated and performs the operation if needed. + /// + /// Returns `true` if the view was created/updated, `false` if it was already pointing to the correct table. + async fn ensure_view_points_to_table( + inner: &mut Inner, + view_name: &BigQueryTableId, + target_table_id: &BigQueryTableId, + ) -> EtlResult { + // Check if the view already points to the correct table + if let Some(current_target) = inner.created_views.get(view_name) { + if current_target == target_table_id { + debug!( + "view {} already points to {}, skipping creation", + view_name, target_table_id + ); + return Ok(false); + } + } + + // Create or replace the view + inner + .client + .create_or_replace_view(&inner.dataset_id, view_name, target_table_id) + .await?; + + // Update cache + inner + .created_views + .insert(view_name.clone(), target_table_id.clone()); + + debug!( + "view {} created/updated to point to {}", + view_name, target_table_id + ); + Ok(true) + } + /// Handles streaming rows with fallback table creation on missing table errors. /// /// Attempts to stream rows to BigQuery. If streaming fails with a table not found error, @@ -408,25 +507,23 @@ where // Process truncate events if !truncate_events.is_empty() { - // Right now we are not processing truncate messages, but we do the streaming split - // just to try out if splitting the streaming affects performance so that we might - // need it down the line once we figure out a solution for truncation. - warn!( - "'TRUNCATE' events are not supported, skipping apply of {} 'TRUNCATE' events", + info!( + "Processing {} 'TRUNCATE' events with versioned table recreation", truncate_events.len() ); + self.process_truncate_events(truncate_events).await?; } } Ok(()) } - /// Processes truncate events by executing `TRUNCATE TABLE` statements in BigQuery. + /// Processes truncate events by creating new versioned tables and updating views. /// - /// Maps PostgreSQL table OIDs to BigQuery table names and issues truncate commands. - #[allow(dead_code)] + /// Maps PostgreSQL table OIDs to BigQuery table names, creates new versioned tables, + /// updates views to point to new tables, and schedules old table cleanup. async fn process_truncate_events(&self, truncate_events: Vec) -> EtlResult<()> { - let inner = self.inner.lock().await; + let mut inner = self.inner.lock().await; for truncate_event in truncate_events { for table_id in truncate_event.rel_ids { @@ -435,13 +532,63 @@ where .get_table_schema(&TableId::new(table_id)) .await? { + let base_table_id = table_name_to_bigquery_table_id(&table_schema.name); + + // Get the previous table name for cleanup + let previous_table_id = + Self::get_previous_versioned_table_name(&inner, &base_table_id); + + // Create the new versioned table + let new_versioned_table_id = + Self::increment_table_version(&mut inner, &base_table_id); + + info!( + "processing truncate for table {}: creating new version {}", + base_table_id, new_versioned_table_id + ); + + // Create the new table inner .client - .truncate_table( + .create_table( &inner.dataset_id, - &table_name_to_bigquery_table_id(&table_schema.name), + &new_versioned_table_id, + &table_schema.column_schemas, + inner.max_staleness_mins, ) .await?; + + // Update the view to point to the new table (uses cache to avoid redundant operations) + Self::ensure_view_points_to_table( + &mut inner, + &base_table_id, + &new_versioned_table_id, + ) + .await?; + + // Add new table to creation cache + Self::add_to_created_tables_cache(&mut inner, new_versioned_table_id.clone()); + + info!( + "successfully processed truncate for {}: new table {}, view updated", + base_table_id, new_versioned_table_id + ); + + // Schedule cleanup of the previous table (done asynchronously to avoid blocking) + if let Some(prev_table_id) = previous_table_id { + Self::remove_from_created_tables_cache(&mut inner, &prev_table_id); + + let client = inner.client.clone(); + let dataset_id = inner.dataset_id.clone(); + + tokio::spawn(async move { + if let Err(err) = client.drop_table(&dataset_id, &prev_table_id).await { + warn!("failed to drop previous table {}: {}", prev_table_id, err); + } else { + info!("successfully cleaned up previous table {}", prev_table_id); + } + }); + } } else { info!( "table schema not found for table_id: {}, skipping truncate", @@ -538,4 +685,78 @@ mod tests { "a____b_c____d" ); } + + #[test] + fn test_versioned_table_naming() { + let mut versions = HashMap::new(); + let base_table = "schema_table".to_string(); + + // Test initial version + let version = versions.entry(base_table.clone()).or_insert(0); + let versioned_name = format!("{}_{}", base_table, version); + assert_eq!(versioned_name, "schema_table_0"); + + // Test increment + *version += 1; + let versioned_name = format!("{}_{}", base_table, version); + assert_eq!(versioned_name, "schema_table_1"); + } + + #[test] + fn test_previous_versioned_table_name() { + let mut versions = HashMap::new(); + let base_table = "schema_table".to_string(); + + // No previous version when starting + assert!(versions.get(&base_table).is_none()); + + // Version 0 has no previous + versions.insert(base_table.clone(), 0); + assert_eq!(versions.get(&base_table), Some(&0)); + + // Version 1 has previous 0 + versions.insert(base_table.clone(), 1); + let prev_version = versions + .get(&base_table) + .and_then(|&v| if v > 0 { Some(v - 1) } else { None }); + assert_eq!(prev_version, Some(0)); + + // Version 2 has previous 1 + versions.insert(base_table.clone(), 2); + let prev_version = versions + .get(&base_table) + .and_then(|&v| if v > 0 { Some(v - 1) } else { None }); + assert_eq!(prev_version, Some(1)); + } + + #[test] + fn test_view_caching_logic() { + let mut created_views = HashMap::new(); + let view_name = "schema_table".to_string(); + let target_table_v0 = "schema_table_0".to_string(); + let target_table_v1 = "schema_table_1".to_string(); + + // Initially no view exists + assert!(created_views.get(&view_name).is_none()); + + // After first creation, view points to v0 + created_views.insert(view_name.clone(), target_table_v0.clone()); + assert_eq!(created_views.get(&view_name), Some(&target_table_v0)); + + // Check if view needs update to v1 (should return true since it points to v0) + let needs_update = created_views + .get(&view_name) + .map_or(true, |current| current != &target_table_v1); + assert!(needs_update); + + // After update, view points to v1 + created_views.insert(view_name.clone(), target_table_v1.clone()); + assert_eq!(created_views.get(&view_name), Some(&target_table_v1)); + + // Check if view needs update to v1 again (should return false since it already points to v1) + let needs_update = created_views + .get(&view_name) + .map_or(true, |current| current != &target_table_v1); + assert!(!needs_update); + } } From d65e3109c0384d7226dfdd4b3a83101f80cc0227 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 11 Aug 2025 14:00:07 +0200 Subject: [PATCH 2/7] Improve --- etl-destinations/src/bigquery/client.rs | 97 ++++++++++++++++++- etl-destinations/src/bigquery/core.rs | 14 ++- etl-destinations/tests/common/bigquery.rs | 2 +- .../tests/integration/bigquery_test.rs | 34 ++++--- 4 files changed, 122 insertions(+), 25 deletions(-) diff --git a/etl-destinations/src/bigquery/client.rs b/etl-destinations/src/bigquery/client.rs index 7f3c565f6..28487923c 100644 --- a/etl-destinations/src/bigquery/client.rs +++ b/etl-destinations/src/bigquery/client.rs @@ -111,6 +111,45 @@ impl BigQueryClient { format!("`{}.{}.{}`", self.project_id, dataset_id, table_id) } + /// Creates a table in BigQuery if it doesn't already exist, otherwise efficiently truncates + /// and recreates the table with the same schema. + /// + /// This method uses BigQuery's CREATE OR REPLACE TABLE statement which is more efficient + /// than dropping and recreating as it preserves table metadata and permissions. + /// + /// Returns `true` if the table was created fresh, `false` if it already existed and was replaced. + pub async fn create_or_replace_table( + &self, + dataset_id: &BigQueryDatasetId, + table_id: &BigQueryTableId, + column_schemas: &[ColumnSchema], + max_staleness_mins: Option, + ) -> EtlResult { + let table_existed = self.table_exists(dataset_id, table_id).await?; + + let full_table_name = self.full_table_name(dataset_id, table_id); + + let columns_spec = Self::create_columns_spec(column_schemas); + let max_staleness_option = if let Some(max_staleness_mins) = max_staleness_mins { + Self::max_staleness_option(max_staleness_mins) + } else { + "".to_string() + }; + + info!( + "creating or replacing table {full_table_name} in BigQuery (existed: {table_existed})" + ); + + let query = format!( + "create or replace table {full_table_name} {columns_spec} {max_staleness_option}" + ); + + let _ = self.query(QueryRequest::new(query)).await?; + + // Return true if it was a fresh creation, false if it was a replacement + Ok(!table_existed) + } + /// Creates a table in BigQuery if it doesn't already exist. /// /// Returns `true` if the table was created, `false` if it already existed. @@ -171,7 +210,7 @@ impl BigQueryClient { ) -> EtlResult<()> { let full_table_name = self.full_table_name(dataset_id, table_id); - info!("Truncating table {full_table_name} in BigQuery"); + info!("truncating table {full_table_name} in BigQuery"); let delete_query = format!("truncate table {full_table_name}",); @@ -192,10 +231,10 @@ impl BigQueryClient { let full_view_name = self.full_table_name(dataset_id, view_name); let full_target_table_name = self.full_table_name(dataset_id, target_table_id); - info!("Creating/replacing view {full_view_name} pointing to {full_target_table_name}"); + info!("creating/replacing view {full_view_name} pointing to {full_target_table_name}"); let query = format!( - "CREATE OR REPLACE VIEW {full_view_name} AS SELECT * FROM {full_target_table_name}" + "create or replace view {full_view_name} as select * from {full_target_table_name}" ); let _ = self.query(QueryRequest::new(query)).await?; @@ -213,9 +252,9 @@ impl BigQueryClient { ) -> EtlResult<()> { let full_table_name = self.full_table_name(dataset_id, table_id); - info!("Dropping table {full_table_name} from BigQuery"); + info!("dropping table {full_table_name} from bigquery"); - let query = format!("DROP TABLE IF EXISTS {full_table_name}"); + let query = format!("drop table if exists {full_table_name}"); let _ = self.query(QueryRequest::new(query)).await?; @@ -287,6 +326,7 @@ impl BigQueryClient { let append_rows_response = append_rows_response .map_err(BQError::from) .map_err(bq_error_to_etl_error)?; + if !append_rows_response.row_errors.is_empty() { // We convert the error into an `ETLError`. let row_errors = append_rows_response @@ -919,4 +959,51 @@ mod tests { let full_name = format!("`{project_id}.{dataset_id}.{table_id}`"); assert_eq!(full_name, "`test-project.test_dataset.test_table`"); } + + #[test] + fn test_create_or_replace_table_query_generation() { + let project_id = "test-project"; + let dataset_id = "test_dataset"; + let table_id = "test_table"; + + let columns = vec![ + ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true), + ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false), + ]; + + // Simulate the query generation logic + let full_table_name = format!("`{project_id}.{dataset_id}.{table_id}`"); + let columns_spec = BigQueryClient::create_columns_spec(&columns); + let query = format!("create or replace table {full_table_name} {columns_spec}"); + + let expected_query = "create or replace table `test-project.test_dataset.test_table` (`id` int64 not null,`name` string, primary key (`id`) not enforced)"; + assert_eq!(query, expected_query); + } + + #[test] + fn test_create_or_replace_table_query_with_staleness() { + let project_id = "test-project"; + let dataset_id = "test_dataset"; + let table_id = "test_table"; + let max_staleness_mins = 15; + + let columns = vec![ColumnSchema::new( + "id".to_string(), + Type::INT4, + -1, + false, + true, + )]; + + // Simulate the query generation logic with staleness + let full_table_name = format!("`{project_id}.{dataset_id}.{table_id}`"); + let columns_spec = BigQueryClient::create_columns_spec(&columns); + let max_staleness_option = BigQueryClient::max_staleness_option(max_staleness_mins); + let query = format!( + "create or replace table {full_table_name} {columns_spec} {max_staleness_option}" + ); + + let expected_query = "create or replace table `test-project.test_dataset.test_table` (`id` int64 not null, primary key (`id`) not enforced) options (max_staleness = interval 15 minute)"; + assert_eq!(query, expected_query); + } } diff --git a/etl-destinations/src/bigquery/core.rs b/etl-destinations/src/bigquery/core.rs index ead9c6006..8858d6e01 100644 --- a/etl-destinations/src/bigquery/core.rs +++ b/etl-destinations/src/bigquery/core.rs @@ -547,10 +547,13 @@ where base_table_id, new_versioned_table_id ); - // Create the new table + // Create or replace the new table. + // + // We unconditionally replace the table if it's there because here we know that + // we need the table to be empty given the truncation. inner .client - .create_table( + .create_or_replace_table( &inner.dataset_id, &new_versioned_table_id, &table_schema.column_schemas, @@ -558,15 +561,16 @@ where ) .await?; - // Update the view to point to the new table (uses cache to avoid redundant operations) + // Update the view to point to the new table. + // + // We do this after the table has been created to that in case of failure, the + // view can be manually updated to point to the new table. Self::ensure_view_points_to_table( &mut inner, &base_table_id, &new_versioned_table_id, ) .await?; - - // Add new table to creation cache Self::add_to_created_tables_cache(&mut inner, new_versioned_table_id.clone()); info!( diff --git a/etl-destinations/tests/common/bigquery.rs b/etl-destinations/tests/common/bigquery.rs index 5ea835825..bb43553c6 100644 --- a/etl-destinations/tests/common/bigquery.rs +++ b/etl-destinations/tests/common/bigquery.rs @@ -154,7 +154,7 @@ impl Drop for BigQueryDatabase { // task is issued, the runtime will offload existing tasks to another worker. tokio::task::block_in_place(move || { Handle::current().block_on(async move { - destroy_bigquery(&client, self.project_id(), self.dataset_id()).await; + // destroy_bigquery(&client, self.project_id(), self.dataset_id()).await; }); }); } diff --git a/etl-destinations/tests/integration/bigquery_test.rs b/etl-destinations/tests/integration/bigquery_test.rs index 3d5dd6213..9be530b3c 100644 --- a/etl-destinations/tests/integration/bigquery_test.rs +++ b/etl-destinations/tests/integration/bigquery_test.rs @@ -13,7 +13,7 @@ use etl_telemetry::init_test_tracing; use rand::random; use std::str::FromStr; use std::time::Duration; -use tokio::time::sleep; +use tokio::time::{sleep, timeout}; use crate::common::bigquery::{ BigQueryOrder, BigQueryUser, NonNullableColsScalar, NullableColsArray, NullableColsScalar, @@ -368,10 +368,7 @@ async fn table_subsequent_updates() { assert_eq!(parsed_users_rows, vec![BigQueryUser::new(1, "user_2", 2),]); } -// This test is disabled since truncation is currently not supported by BigQuery when doing CDC -// streaming. The test is kept just for future use. #[tokio::test(flavor = "multi_thread")] -#[ignore] async fn table_truncate_with_batching() { init_test_tracing(); install_crypto_provider_for_bigquery(); @@ -420,17 +417,17 @@ async fn table_truncate_with_batching() { users_state_notify.notified().await; orders_state_notify.notified().await; - // Wait for the 4 inserts (2 per table) and 2 truncates (one per table). + // Wait for the 20 inserts (10 per table) and 2 truncates (1 per table). let event_notify = destination - .wait_for_events_count(vec![(EventType::Insert, 4), (EventType::Truncate, 2)]) + .wait_for_events_count(vec![(EventType::Insert, 20), (EventType::Truncate, 2)]) .await; - // Insert 1 row per each table. + // Insert 10 rows per each table. insert_mock_data( &mut database, &database_schema.users_schema().name, &database_schema.orders_schema().name, - 1..=1, + 1..=10, false, ) .await; @@ -445,28 +442,34 @@ async fn table_truncate_with_batching() { .await .unwrap(); - // Insert 1 extra row per each table. + // Insert 2 extra rows per each table. insert_mock_data( &mut database, &database_schema.users_schema().name, &database_schema.orders_schema().name, - 2..=2, + 11..=12, false, ) .await; - event_notify.notified().await; + timeout(Duration::from_secs(10), event_notify.notified()).await; pipeline.shutdown_and_wait().await.unwrap(); - // We query BigQuery directly to get the data which has been inserted by tests expecting that + // We query BigQuery directly to get the data which tests have inserted, expecting that // only the rows after truncation are there. let users_rows = bigquery_database .query_table(database_schema.users_schema().name) .await .unwrap(); let parsed_users_rows = parse_bigquery_table_rows::(users_rows); - assert_eq!(parsed_users_rows, vec![BigQueryUser::new(2, "user_2", 2),]); + assert_eq!( + parsed_users_rows, + vec![ + BigQueryUser::new(11, "user_11", 11), + BigQueryUser::new(12, "user_12", 12), + ] + ); let orders_rows = bigquery_database .query_table(database_schema.orders_schema().name) .await @@ -474,7 +477,10 @@ async fn table_truncate_with_batching() { let parsed_orders_rows = parse_bigquery_table_rows::(orders_rows); assert_eq!( parsed_orders_rows, - vec![BigQueryOrder::new(2, "description_2"),] + vec![ + BigQueryOrder::new(11, "description_11"), + BigQueryOrder::new(12, "description_12") + ] ); } From 65d5876531d0c31325b018c64747cd4e09fcf026 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 11 Aug 2025 14:17:10 +0200 Subject: [PATCH 3/7] Improve --- etl-destinations/src/bigquery/core.rs | 80 +------------------ etl-destinations/tests/common/bigquery.rs | 2 +- .../tests/integration/bigquery_test.rs | 22 ++--- 3 files changed, 15 insertions(+), 89 deletions(-) diff --git a/etl-destinations/src/bigquery/core.rs b/etl-destinations/src/bigquery/core.rs index 8858d6e01..788be8421 100644 --- a/etl-destinations/src/bigquery/core.rs +++ b/etl-destinations/src/bigquery/core.rs @@ -246,7 +246,7 @@ where .table_versions .entry(base_table_id.clone()) .or_insert(0); - format!("{}_{}", base_table_id, version) + format!("{base_table_id}_{version}") } /// Returns the previous versioned table name for cleanup purposes. @@ -276,7 +276,7 @@ where .entry(base_table_id.clone()) .or_insert(0); *version += 1; - format!("{}_{}", base_table_id, version) + format!("{base_table_id}_{version}") } /// Checks if a view needs to be created or updated and performs the operation if needed. @@ -418,7 +418,7 @@ where let mut table_id_to_table_rows = HashMap::new(); let mut truncate_events = Vec::new(); - // Process events until we hit a truncate or run out of events + // Process events until we hit a truncate event or run out of events while let Some(event) = event_iter.peek() { if matches!(event, Event::Truncate(_)) { break; @@ -689,78 +689,4 @@ mod tests { "a____b_c____d" ); } - - #[test] - fn test_versioned_table_naming() { - let mut versions = HashMap::new(); - let base_table = "schema_table".to_string(); - - // Test initial version - let version = versions.entry(base_table.clone()).or_insert(0); - let versioned_name = format!("{}_{}", base_table, version); - assert_eq!(versioned_name, "schema_table_0"); - - // Test increment - *version += 1; - let versioned_name = format!("{}_{}", base_table, version); - assert_eq!(versioned_name, "schema_table_1"); - } - - #[test] - fn test_previous_versioned_table_name() { - let mut versions = HashMap::new(); - let base_table = "schema_table".to_string(); - - // No previous version when starting - assert!(versions.get(&base_table).is_none()); - - // Version 0 has no previous - versions.insert(base_table.clone(), 0); - assert_eq!(versions.get(&base_table), Some(&0)); - - // Version 1 has previous 0 - versions.insert(base_table.clone(), 1); - let prev_version = versions - .get(&base_table) - .and_then(|&v| if v > 0 { Some(v - 1) } else { None }); - assert_eq!(prev_version, Some(0)); - - // Version 2 has previous 1 - versions.insert(base_table.clone(), 2); - let prev_version = versions - .get(&base_table) - .and_then(|&v| if v > 0 { Some(v - 1) } else { None }); - assert_eq!(prev_version, Some(1)); - } - - #[test] - fn test_view_caching_logic() { - let mut created_views = HashMap::new(); - let view_name = "schema_table".to_string(); - let target_table_v0 = "schema_table_0".to_string(); - let target_table_v1 = "schema_table_1".to_string(); - - // Initially no view exists - assert!(created_views.get(&view_name).is_none()); - - // After first creation, view points to v0 - created_views.insert(view_name.clone(), target_table_v0.clone()); - assert_eq!(created_views.get(&view_name), Some(&target_table_v0)); - - // Check if view needs update to v1 (should return true since it points to v0) - let needs_update = created_views - .get(&view_name) - .map_or(true, |current| current != &target_table_v1); - assert!(needs_update); - - // After update, view points to v1 - created_views.insert(view_name.clone(), target_table_v1.clone()); - assert_eq!(created_views.get(&view_name), Some(&target_table_v1)); - - // Check if view needs update to v1 again (should return false since it already points to v1) - let needs_update = created_views - .get(&view_name) - .map_or(true, |current| current != &target_table_v1); - assert!(!needs_update); - } } diff --git a/etl-destinations/tests/common/bigquery.rs b/etl-destinations/tests/common/bigquery.rs index bb43553c6..5ea835825 100644 --- a/etl-destinations/tests/common/bigquery.rs +++ b/etl-destinations/tests/common/bigquery.rs @@ -154,7 +154,7 @@ impl Drop for BigQueryDatabase { // task is issued, the runtime will offload existing tasks to another worker. tokio::task::block_in_place(move || { Handle::current().block_on(async move { - // destroy_bigquery(&client, self.project_id(), self.dataset_id()).await; + destroy_bigquery(&client, self.project_id(), self.dataset_id()).await; }); }); } diff --git a/etl-destinations/tests/integration/bigquery_test.rs b/etl-destinations/tests/integration/bigquery_test.rs index 9be530b3c..c771a291b 100644 --- a/etl-destinations/tests/integration/bigquery_test.rs +++ b/etl-destinations/tests/integration/bigquery_test.rs @@ -13,7 +13,7 @@ use etl_telemetry::init_test_tracing; use rand::random; use std::str::FromStr; use std::time::Duration; -use tokio::time::{sleep, timeout}; +use tokio::time::sleep; use crate::common::bigquery::{ BigQueryOrder, BigQueryUser, NonNullableColsScalar, NullableColsArray, NullableColsScalar, @@ -417,17 +417,17 @@ async fn table_truncate_with_batching() { users_state_notify.notified().await; orders_state_notify.notified().await; - // Wait for the 20 inserts (10 per table) and 2 truncates (1 per table). + // Wait for the 8 inserts (4 per table + 4 after truncate) and 2 truncates (1 per table). let event_notify = destination - .wait_for_events_count(vec![(EventType::Insert, 20), (EventType::Truncate, 2)]) + .wait_for_events_count(vec![(EventType::Insert, 8), (EventType::Truncate, 2)]) .await; - // Insert 10 rows per each table. + // Insert 2 rows per each table. insert_mock_data( &mut database, &database_schema.users_schema().name, &database_schema.orders_schema().name, - 1..=10, + 1..=2, false, ) .await; @@ -447,12 +447,12 @@ async fn table_truncate_with_batching() { &mut database, &database_schema.users_schema().name, &database_schema.orders_schema().name, - 11..=12, + 3..=4, false, ) .await; - timeout(Duration::from_secs(10), event_notify.notified()).await; + event_notify.notified().await; pipeline.shutdown_and_wait().await.unwrap(); @@ -466,8 +466,8 @@ async fn table_truncate_with_batching() { assert_eq!( parsed_users_rows, vec![ - BigQueryUser::new(11, "user_11", 11), - BigQueryUser::new(12, "user_12", 12), + BigQueryUser::new(3, "user_3", 3), + BigQueryUser::new(4, "user_4", 4), ] ); let orders_rows = bigquery_database @@ -478,8 +478,8 @@ async fn table_truncate_with_batching() { assert_eq!( parsed_orders_rows, vec![ - BigQueryOrder::new(11, "description_11"), - BigQueryOrder::new(12, "description_12") + BigQueryOrder::new(3, "description_3"), + BigQueryOrder::new(4, "description_4") ] ); } From 04d8a75e05c490605949a51469502884bd320054 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 11 Aug 2025 14:36:07 +0200 Subject: [PATCH 4/7] Improve --- etl-destinations/src/bigquery/core.rs | 141 ++++++++++++++------------ 1 file changed, 76 insertions(+), 65 deletions(-) diff --git a/etl-destinations/src/bigquery/core.rs b/etl-destinations/src/bigquery/core.rs index 788be8421..cf1f15e83 100644 --- a/etl-destinations/src/bigquery/core.rs +++ b/etl-destinations/src/bigquery/core.rs @@ -522,83 +522,94 @@ where /// /// Maps PostgreSQL table OIDs to BigQuery table names, creates new versioned tables, /// updates views to point to new tables, and schedules old table cleanup. + /// Deduplicates table IDs to avoid redundant truncate operations on the same table. async fn process_truncate_events(&self, truncate_events: Vec) -> EtlResult<()> { let mut inner = self.inner.lock().await; + // Collect and deduplicate all table IDs from all truncate events. + // + // This is done as an optimization since if we have multiple table ids being truncated in a + // row without applying other events in the meanwhile, it doesn't make any sense to create + // new empty tables for each of them. + let mut unique_table_ids = HashSet::new(); for truncate_event in truncate_events { for table_id in truncate_event.rel_ids { - if let Some(table_schema) = inner - .schema_store - .get_table_schema(&TableId::new(table_id)) - .await? - { - let base_table_id = table_name_to_bigquery_table_id(&table_schema.name); - - // Get the previous table name for cleanup - let previous_table_id = - Self::get_previous_versioned_table_name(&inner, &base_table_id); - - // Create the new versioned table - let new_versioned_table_id = - Self::increment_table_version(&mut inner, &base_table_id); - - info!( - "processing truncate for table {}: creating new version {}", - base_table_id, new_versioned_table_id - ); + unique_table_ids.insert(table_id); + } + } - // Create or replace the new table. - // - // We unconditionally replace the table if it's there because here we know that - // we need the table to be empty given the truncation. - inner - .client - .create_or_replace_table( - &inner.dataset_id, - &new_versioned_table_id, - &table_schema.column_schemas, - inner.max_staleness_mins, - ) - .await?; + for table_id in unique_table_ids { + let Some(table_schema) = inner + .schema_store + .get_table_schema(&TableId::new(table_id)) + .await? + else { + info!( + "table schema not found for table_id: {}, skipping truncate", + table_id + ); + continue; + }; - // Update the view to point to the new table. - // - // We do this after the table has been created to that in case of failure, the - // view can be manually updated to point to the new table. - Self::ensure_view_points_to_table( - &mut inner, - &base_table_id, - &new_versioned_table_id, - ) - .await?; - Self::add_to_created_tables_cache(&mut inner, new_versioned_table_id.clone()); + let base_table_id = table_name_to_bigquery_table_id(&table_schema.name); - info!( - "successfully processed truncate for {}: new table {}, view updated", - base_table_id, new_versioned_table_id - ); + // Get the previous table name for cleanup + let previous_table_id = Self::get_previous_versioned_table_name(&inner, &base_table_id); - // Schedule cleanup of the previous table (done asynchronously to avoid blocking) - if let Some(prev_table_id) = previous_table_id { - Self::remove_from_created_tables_cache(&mut inner, &prev_table_id); + // Create the new versioned table + let new_versioned_table_id = Self::increment_table_version(&mut inner, &base_table_id); - let client = inner.client.clone(); - let dataset_id = inner.dataset_id.clone(); + info!( + "processing truncate for table {}: creating new version {}", + base_table_id, new_versioned_table_id + ); - tokio::spawn(async move { - if let Err(err) = client.drop_table(&dataset_id, &prev_table_id).await { - warn!("failed to drop previous table {}: {}", prev_table_id, err); - } else { - info!("successfully cleaned up previous table {}", prev_table_id); - } - }); + // Create or replace the new table. + // + // We unconditionally replace the table if it's there because here we know that + // we need the table to be empty given the truncation. + inner + .client + .create_or_replace_table( + &inner.dataset_id, + &new_versioned_table_id, + &table_schema.column_schemas, + inner.max_staleness_mins, + ) + .await?; + + // Update the view to point to the new table. + // + // We do this after the table has been created to that in case of failure, the + // view can be manually updated to point to the new table. + // + // Unfortunately, BigQuery doesn't seem to offer transactions for DDL operations so our + // implementation is best effort. + Self::ensure_view_points_to_table(&mut inner, &base_table_id, &new_versioned_table_id) + .await?; + Self::add_to_created_tables_cache(&mut inner, new_versioned_table_id.clone()); + + info!( + "successfully processed truncate for {}: new table {}, view updated", + base_table_id, new_versioned_table_id + ); + + if let Some(prev_table_id) = previous_table_id { + Self::remove_from_created_tables_cache(&mut inner, &prev_table_id); + + let client = inner.client.clone(); + let dataset_id = inner.dataset_id.clone(); + + // Schedule cleanup of the previous table. We do not care to track this task since + // if it fails, users can clean up the table on their own, but the view will still point + // to the new data. + tokio::spawn(async move { + if let Err(err) = client.drop_table(&dataset_id, &prev_table_id).await { + warn!("failed to drop previous table {}: {}", prev_table_id, err); + } else { + info!("successfully cleaned up previous table {}", prev_table_id); } - } else { - info!( - "table schema not found for table_id: {}, skipping truncate", - table_id - ); - } + }); } } From 63be5f197f510c3328d66e0a1a2a2f6fd9489d23 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 11 Aug 2025 14:41:09 +0200 Subject: [PATCH 5/7] Improve --- etl-destinations/src/bigquery/core.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/etl-destinations/src/bigquery/core.rs b/etl-destinations/src/bigquery/core.rs index cf1f15e83..17dadc516 100644 --- a/etl-destinations/src/bigquery/core.rs +++ b/etl-destinations/src/bigquery/core.rs @@ -84,6 +84,9 @@ struct Inner { /// Cache of views that have been created and the versioned table they point to. /// This avoids redundant `CREATE OR REPLACE VIEW` calls for views that already point to the correct table. /// Maps view name to the versioned table it currently points to. + /// + /// # Example + /// `{ users_table: users_table_10, orders_table: orders_table_3 }` created_views: HashMap, } @@ -276,6 +279,7 @@ where .entry(base_table_id.clone()) .or_insert(0); *version += 1; + format!("{base_table_id}_{version}") } @@ -313,6 +317,7 @@ where "view {} created/updated to point to {}", view_name, target_table_id ); + Ok(true) } From d0fb2758e79de7ef2cca921b18fc767c80a86a15 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 11 Aug 2025 14:49:38 +0200 Subject: [PATCH 6/7] Improve --- etl-destinations/src/bigquery/core.rs | 17 +++--- etl-destinations/tests/common/bigquery.rs | 53 +++++++++++++++++++ .../tests/integration/bigquery_test.rs | 6 +++ 3 files changed, 68 insertions(+), 8 deletions(-) diff --git a/etl-destinations/src/bigquery/core.rs b/etl-destinations/src/bigquery/core.rs index 17dadc516..6072b1bd3 100644 --- a/etl-destinations/src/bigquery/core.rs +++ b/etl-destinations/src/bigquery/core.rs @@ -292,14 +292,15 @@ where target_table_id: &BigQueryTableId, ) -> EtlResult { // Check if the view already points to the correct table - if let Some(current_target) = inner.created_views.get(view_name) { - if current_target == target_table_id { - debug!( - "view {} already points to {}, skipping creation", - view_name, target_table_id - ); - return Ok(false); - } + if let Some(current_target) = inner.created_views.get(view_name) + && current_target == target_table_id + { + debug!( + "view {} already points to {}, skipping creation", + view_name, target_table_id + ); + + return Ok(false); } // Create or replace the view diff --git a/etl-destinations/tests/common/bigquery.rs b/etl-destinations/tests/common/bigquery.rs index 5ea835825..a1e8c308f 100644 --- a/etl-destinations/tests/common/bigquery.rs +++ b/etl-destinations/tests/common/bigquery.rs @@ -34,6 +34,29 @@ fn random_dataset_id() -> String { /// /// Provides a unified interface for BigQuery operations in tests, automatically /// handling setup and teardown of test datasets using actual Google Cloud credentials. +/// +/// # Table Creation Examples +/// +/// ```rust +/// // Create a simple table with column definitions +/// let table_name = TableName::new("public".to_string(), "users".to_string()); +/// let columns = &[ +/// ("id", "INT64"), +/// ("name", "STRING"), +/// ("age", "INT64"), +/// ("created_at", "TIMESTAMP") +/// ]; +/// db.create_table(table_name, columns).await; +/// +/// // Create a table with custom DDL +/// let ddl = "CREATE TABLE `project.dataset.public_products` ( +/// id INT64, +/// name STRING, +/// price NUMERIC(10,2), +/// tags ARRAY +/// )"; +/// db.create_table_with_ddl(table_name, ddl).await; +/// ``` pub struct BigQueryDatabase { client: Option, sa_key_path: String, @@ -115,6 +138,36 @@ impl BigQueryDatabase { .rows } + /// Manually creates a table in the test dataset using column definitions. + /// + /// Creates a table by generating a DDL statement from the provided column specifications. + /// Each column is specified as a tuple of (column_name, bigquery_type). + pub async fn create_table(&self, table_name: TableName, columns: &[(&str, &str)]) { + let client = self.client().unwrap(); + let project_id = self.project_id(); + let dataset_id = self.dataset_id(); + let table_id = table_name_to_bigquery_table_id(&table_name); + + let column_definitions: Vec = columns + .iter() + .map(|(name, data_type)| format!("{} {}", name, data_type)) + .collect(); + + let ddl = format!( + "CREATE TABLE `{}.{}.{}` ({})", + project_id, + dataset_id, + table_id, + column_definitions.join(", ") + ); + + client + .job() + .query(project_id, QueryRequest::new(ddl)) + .await + .unwrap(); + } + /// Returns the Google Cloud project ID for this database instance. fn project_id(&self) -> &str { &self.project_id diff --git a/etl-destinations/tests/integration/bigquery_test.rs b/etl-destinations/tests/integration/bigquery_test.rs index c771a291b..6d6854e30 100644 --- a/etl-destinations/tests/integration/bigquery_test.rs +++ b/etl-destinations/tests/integration/bigquery_test.rs @@ -378,6 +378,12 @@ async fn table_truncate_with_batching() { let bigquery_database = setup_bigquery_connection().await; + // We create table `test_users_1` to simulate an error in the system where a table with that name + // already exists and should be replaced for replication to work correctly. + bigquery_database + .create_table(test_table_name("users_1"), &[("age", "integer")]) + .await; + let store = NotifyingStore::new(); let raw_destination = bigquery_database.build_destination(store.clone()).await; let destination = TestDestinationWrapper::wrap(raw_destination); From a716f0b1c0791115261cbfd648222ef47b91a715 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 11 Aug 2025 15:04:15 +0200 Subject: [PATCH 7/7] Improve --- etl-destinations/tests/common/bigquery.rs | 3 +-- etl-destinations/tests/integration/bigquery_test.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/etl-destinations/tests/common/bigquery.rs b/etl-destinations/tests/common/bigquery.rs index a1e8c308f..d0670998f 100644 --- a/etl-destinations/tests/common/bigquery.rs +++ b/etl-destinations/tests/common/bigquery.rs @@ -142,11 +142,10 @@ impl BigQueryDatabase { /// /// Creates a table by generating a DDL statement from the provided column specifications. /// Each column is specified as a tuple of (column_name, bigquery_type). - pub async fn create_table(&self, table_name: TableName, columns: &[(&str, &str)]) { + pub async fn create_table(&self, table_id: &str, columns: &[(&str, &str)]) { let client = self.client().unwrap(); let project_id = self.project_id(); let dataset_id = self.dataset_id(); - let table_id = table_name_to_bigquery_table_id(&table_name); let column_definitions: Vec = columns .iter() diff --git a/etl-destinations/tests/integration/bigquery_test.rs b/etl-destinations/tests/integration/bigquery_test.rs index 6d6854e30..a1f6fdd6f 100644 --- a/etl-destinations/tests/integration/bigquery_test.rs +++ b/etl-destinations/tests/integration/bigquery_test.rs @@ -381,7 +381,7 @@ async fn table_truncate_with_batching() { // We create table `test_users_1` to simulate an error in the system where a table with that name // already exists and should be replaced for replication to work correctly. bigquery_database - .create_table(test_table_name("users_1"), &[("age", "integer")]) + .create_table("test_users_1", &[("age", "integer")]) .await; let store = NotifyingStore::new();