diff --git a/etl-destinations/src/bigquery/client.rs b/etl-destinations/src/bigquery/client.rs index 55bbfe0d8..28487923c 100644 --- a/etl-destinations/src/bigquery/client.rs +++ b/etl-destinations/src/bigquery/client.rs @@ -62,6 +62,7 @@ impl fmt::Display for BigQueryOperationType { /// /// Provides methods for table management, data insertion, and query execution /// against BigQuery datasets with authentication and error handling. +#[derive(Clone)] pub struct BigQueryClient { project_id: BigQueryProjectId, client: Client, @@ -110,6 +111,45 @@ impl BigQueryClient { format!("`{}.{}.{}`", self.project_id, dataset_id, table_id) } + /// Creates a table in BigQuery if it doesn't already exist, otherwise efficiently truncates + /// and recreates the table with the same schema. + /// + /// This method uses BigQuery's CREATE OR REPLACE TABLE statement which is more efficient + /// than dropping and recreating as it preserves table metadata and permissions. + /// + /// Returns `true` if the table was created fresh, `false` if it already existed and was replaced. + pub async fn create_or_replace_table( + &self, + dataset_id: &BigQueryDatasetId, + table_id: &BigQueryTableId, + column_schemas: &[ColumnSchema], + max_staleness_mins: Option, + ) -> EtlResult { + let table_existed = self.table_exists(dataset_id, table_id).await?; + + let full_table_name = self.full_table_name(dataset_id, table_id); + + let columns_spec = Self::create_columns_spec(column_schemas); + let max_staleness_option = if let Some(max_staleness_mins) = max_staleness_mins { + Self::max_staleness_option(max_staleness_mins) + } else { + "".to_string() + }; + + info!( + "creating or replacing table {full_table_name} in BigQuery (existed: {table_existed})" + ); + + let query = format!( + "create or replace table {full_table_name} {columns_spec} {max_staleness_option}" + ); + + let _ = self.query(QueryRequest::new(query)).await?; + + // Return true if it was a fresh creation, false if it was a replacement + Ok(!table_existed) + } + /// Creates a table in BigQuery if it doesn't already exist. /// /// Returns `true` if the table was created, `false` if it already existed. @@ -170,7 +210,7 @@ impl BigQueryClient { ) -> EtlResult<()> { let full_table_name = self.full_table_name(dataset_id, table_id); - info!("Truncating table {full_table_name} in BigQuery"); + info!("truncating table {full_table_name} in BigQuery"); let delete_query = format!("truncate table {full_table_name}",); @@ -179,6 +219,48 @@ impl BigQueryClient { Ok(()) } + /// Creates or replaces a view that points to the specified versioned table. + /// + /// This is used during truncation operations to redirect the view to a new table version. + pub async fn create_or_replace_view( + &self, + dataset_id: &BigQueryDatasetId, + view_name: &BigQueryTableId, + target_table_id: &BigQueryTableId, + ) -> EtlResult<()> { + let full_view_name = self.full_table_name(dataset_id, view_name); + let full_target_table_name = self.full_table_name(dataset_id, target_table_id); + + info!("creating/replacing view {full_view_name} pointing to {full_target_table_name}"); + + let query = format!( + "create or replace view {full_view_name} as select * from {full_target_table_name}" + ); + + let _ = self.query(QueryRequest::new(query)).await?; + + Ok(()) + } + + /// Drops a table from BigQuery. + /// + /// Executes a DROP TABLE statement to remove the table and all its data. + pub async fn drop_table( + &self, + dataset_id: &BigQueryDatasetId, + table_id: &BigQueryTableId, + ) -> EtlResult<()> { + let full_table_name = self.full_table_name(dataset_id, table_id); + + info!("dropping table {full_table_name} from bigquery"); + + let query = format!("drop table if exists {full_table_name}"); + + let _ = self.query(QueryRequest::new(query)).await?; + + Ok(()) + } + /// Checks whether a table exists in the BigQuery dataset. /// /// Returns `true` if the table exists, `false` otherwise. @@ -244,6 +326,7 @@ impl BigQueryClient { let append_rows_response = append_rows_response .map_err(BQError::from) .map_err(bq_error_to_etl_error)?; + if !append_rows_response.row_errors.is_empty() { // We convert the error into an `ETLError`. let row_errors = append_rows_response @@ -876,4 +959,51 @@ mod tests { let full_name = format!("`{project_id}.{dataset_id}.{table_id}`"); assert_eq!(full_name, "`test-project.test_dataset.test_table`"); } + + #[test] + fn test_create_or_replace_table_query_generation() { + let project_id = "test-project"; + let dataset_id = "test_dataset"; + let table_id = "test_table"; + + let columns = vec![ + ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true), + ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false), + ]; + + // Simulate the query generation logic + let full_table_name = format!("`{project_id}.{dataset_id}.{table_id}`"); + let columns_spec = BigQueryClient::create_columns_spec(&columns); + let query = format!("create or replace table {full_table_name} {columns_spec}"); + + let expected_query = "create or replace table `test-project.test_dataset.test_table` (`id` int64 not null,`name` string, primary key (`id`) not enforced)"; + assert_eq!(query, expected_query); + } + + #[test] + fn test_create_or_replace_table_query_with_staleness() { + let project_id = "test-project"; + let dataset_id = "test_dataset"; + let table_id = "test_table"; + let max_staleness_mins = 15; + + let columns = vec![ColumnSchema::new( + "id".to_string(), + Type::INT4, + -1, + false, + true, + )]; + + // Simulate the query generation logic with staleness + let full_table_name = format!("`{project_id}.{dataset_id}.{table_id}`"); + let columns_spec = BigQueryClient::create_columns_spec(&columns); + let max_staleness_option = BigQueryClient::max_staleness_option(max_staleness_mins); + let query = format!( + "create or replace table {full_table_name} {columns_spec} {max_staleness_option}" + ); + + let expected_query = "create or replace table `test-project.test_dataset.test_table` (`id` int64 not null, primary key (`id`) not enforced) options (max_staleness = interval 15 minute)"; + assert_eq!(query, expected_query); + } } diff --git a/etl-destinations/src/bigquery/core.rs b/etl-destinations/src/bigquery/core.rs index 71b5b7c5a..6072b1bd3 100644 --- a/etl-destinations/src/bigquery/core.rs +++ b/etl-destinations/src/bigquery/core.rs @@ -77,6 +77,17 @@ struct Inner { /// Cache of table IDs that have been successfully created or verified to exist. /// This avoids redundant `create_table_if_missing` calls for known tables. created_tables: HashSet, + /// Tracks the current version number for each table to support truncation. + /// Maps base table name to its current version number (e.g., "schema_table" -> 2). + // TODO: add mapping in the store. + table_versions: HashMap, + /// Cache of views that have been created and the versioned table they point to. + /// This avoids redundant `CREATE OR REPLACE VIEW` calls for views that already point to the correct table. + /// Maps view name to the versioned table it currently points to. + /// + /// # Example + /// `{ users_table: users_table_10, orders_table: orders_table_3 }` + created_views: HashMap, } /// A BigQuery destination that implements the ETL [`Destination`] trait. @@ -110,6 +121,8 @@ where max_staleness_mins, schema_store, created_tables: HashSet::new(), + table_versions: HashMap::new(), + created_views: HashMap::new(), }; Ok(Self { @@ -135,6 +148,8 @@ where max_staleness_mins, schema_store, created_tables: HashSet::new(), + table_versions: HashMap::new(), + created_views: HashMap::new(), }; Ok(Self { @@ -168,34 +183,40 @@ where ) })?; - let table_id = table_name_to_bigquery_table_id(&table_schema.name); + let base_table_id = table_name_to_bigquery_table_id(&table_schema.name); + let versioned_table_id = Self::get_versioned_table_name(inner, &base_table_id); - // Optimistically skip table creation if we've already seen this table - if !inner.created_tables.contains(&table_id) { + // Optimistically skip table creation if we've already seen this versioned table + if !inner.created_tables.contains(&versioned_table_id) { inner .client .create_table_if_missing( &inner.dataset_id, - &table_id, + &versioned_table_id, &table_schema.column_schemas, inner.max_staleness_mins, ) .await?; - // Add the table to the cache (with random eviction if needed) - Self::add_to_created_tables_cache(inner, table_id.clone()); + // Add the versioned table to the cache + Self::add_to_created_tables_cache(inner, versioned_table_id.clone()); - debug!("table {table_id} added to creation cache"); + debug!("versioned table {versioned_table_id} added to creation cache"); } else { - debug!("table {table_id} found in creation cache, skipping existence check"); + debug!( + "versioned table {versioned_table_id} found in creation cache, skipping existence check" + ); } + // Ensure view points to this versioned table (uses cache to avoid redundant operations) + Self::ensure_view_points_to_table(inner, &base_table_id, &versioned_table_id).await?; + let table_descriptor = BigQueryClient::column_schemas_to_table_descriptor( &table_schema.column_schemas, use_cdc_sequence_column, ); - Ok((table_id, table_descriptor)) + Ok((versioned_table_id, table_descriptor)) } /// Adds a table ID to the created tables cache if not present. @@ -217,6 +238,90 @@ where inner.created_tables.remove(table_id); } + /// Returns the versioned table name for the current version of a base table. + /// + /// If no version exists for the table, initializes it to version 0. + fn get_versioned_table_name( + inner: &mut Inner, + base_table_id: &BigQueryTableId, + ) -> BigQueryTableId { + let version = inner + .table_versions + .entry(base_table_id.clone()) + .or_insert(0); + format!("{base_table_id}_{version}") + } + + /// Returns the previous versioned table name for cleanup purposes. + fn get_previous_versioned_table_name( + inner: &Inner, + base_table_id: &BigQueryTableId, + ) -> Option { + inner + .table_versions + .get(base_table_id) + .and_then(|&version| { + if version > 0 { + Some(format!("{}_{}", base_table_id, version - 1)) + } else { + None + } + }) + } + + /// Increments the version for a table and returns the new versioned table name. + fn increment_table_version( + inner: &mut Inner, + base_table_id: &BigQueryTableId, + ) -> BigQueryTableId { + let version = inner + .table_versions + .entry(base_table_id.clone()) + .or_insert(0); + *version += 1; + + format!("{base_table_id}_{version}") + } + + /// Checks if a view needs to be created or updated and performs the operation if needed. + /// + /// Returns `true` if the view was created/updated, `false` if it was already pointing to the correct table. + async fn ensure_view_points_to_table( + inner: &mut Inner, + view_name: &BigQueryTableId, + target_table_id: &BigQueryTableId, + ) -> EtlResult { + // Check if the view already points to the correct table + if let Some(current_target) = inner.created_views.get(view_name) + && current_target == target_table_id + { + debug!( + "view {} already points to {}, skipping creation", + view_name, target_table_id + ); + + return Ok(false); + } + + // Create or replace the view + inner + .client + .create_or_replace_view(&inner.dataset_id, view_name, target_table_id) + .await?; + + // Update cache + inner + .created_views + .insert(view_name.clone(), target_table_id.clone()); + + debug!( + "view {} created/updated to point to {}", + view_name, target_table_id + ); + + Ok(true) + } + /// Handles streaming rows with fallback table creation on missing table errors. /// /// Attempts to stream rows to BigQuery. If streaming fails with a table not found error, @@ -319,7 +424,7 @@ where let mut table_id_to_table_rows = HashMap::new(); let mut truncate_events = Vec::new(); - // Process events until we hit a truncate or run out of events + // Process events until we hit a truncate event or run out of events while let Some(event) = event_iter.peek() { if matches!(event, Event::Truncate(_)) { break; @@ -408,46 +513,109 @@ where // Process truncate events if !truncate_events.is_empty() { - // Right now we are not processing truncate messages, but we do the streaming split - // just to try out if splitting the streaming affects performance so that we might - // need it down the line once we figure out a solution for truncation. - warn!( - "'TRUNCATE' events are not supported, skipping apply of {} 'TRUNCATE' events", + info!( + "Processing {} 'TRUNCATE' events with versioned table recreation", truncate_events.len() ); + self.process_truncate_events(truncate_events).await?; } } Ok(()) } - /// Processes truncate events by executing `TRUNCATE TABLE` statements in BigQuery. + /// Processes truncate events by creating new versioned tables and updating views. /// - /// Maps PostgreSQL table OIDs to BigQuery table names and issues truncate commands. - #[allow(dead_code)] + /// Maps PostgreSQL table OIDs to BigQuery table names, creates new versioned tables, + /// updates views to point to new tables, and schedules old table cleanup. + /// Deduplicates table IDs to avoid redundant truncate operations on the same table. async fn process_truncate_events(&self, truncate_events: Vec) -> EtlResult<()> { - let inner = self.inner.lock().await; + let mut inner = self.inner.lock().await; + // Collect and deduplicate all table IDs from all truncate events. + // + // This is done as an optimization since if we have multiple table ids being truncated in a + // row without applying other events in the meanwhile, it doesn't make any sense to create + // new empty tables for each of them. + let mut unique_table_ids = HashSet::new(); for truncate_event in truncate_events { for table_id in truncate_event.rel_ids { - if let Some(table_schema) = inner - .schema_store - .get_table_schema(&TableId::new(table_id)) - .await? - { - inner - .client - .truncate_table( - &inner.dataset_id, - &table_name_to_bigquery_table_id(&table_schema.name), - ) - .await?; - } else { - info!( - "table schema not found for table_id: {}, skipping truncate", - table_id - ); - } + unique_table_ids.insert(table_id); + } + } + + for table_id in unique_table_ids { + let Some(table_schema) = inner + .schema_store + .get_table_schema(&TableId::new(table_id)) + .await? + else { + info!( + "table schema not found for table_id: {}, skipping truncate", + table_id + ); + continue; + }; + + let base_table_id = table_name_to_bigquery_table_id(&table_schema.name); + + // Get the previous table name for cleanup + let previous_table_id = Self::get_previous_versioned_table_name(&inner, &base_table_id); + + // Create the new versioned table + let new_versioned_table_id = Self::increment_table_version(&mut inner, &base_table_id); + + info!( + "processing truncate for table {}: creating new version {}", + base_table_id, new_versioned_table_id + ); + + // Create or replace the new table. + // + // We unconditionally replace the table if it's there because here we know that + // we need the table to be empty given the truncation. + inner + .client + .create_or_replace_table( + &inner.dataset_id, + &new_versioned_table_id, + &table_schema.column_schemas, + inner.max_staleness_mins, + ) + .await?; + + // Update the view to point to the new table. + // + // We do this after the table has been created to that in case of failure, the + // view can be manually updated to point to the new table. + // + // Unfortunately, BigQuery doesn't seem to offer transactions for DDL operations so our + // implementation is best effort. + Self::ensure_view_points_to_table(&mut inner, &base_table_id, &new_versioned_table_id) + .await?; + Self::add_to_created_tables_cache(&mut inner, new_versioned_table_id.clone()); + + info!( + "successfully processed truncate for {}: new table {}, view updated", + base_table_id, new_versioned_table_id + ); + + if let Some(prev_table_id) = previous_table_id { + Self::remove_from_created_tables_cache(&mut inner, &prev_table_id); + + let client = inner.client.clone(); + let dataset_id = inner.dataset_id.clone(); + + // Schedule cleanup of the previous table. We do not care to track this task since + // if it fails, users can clean up the table on their own, but the view will still point + // to the new data. + tokio::spawn(async move { + if let Err(err) = client.drop_table(&dataset_id, &prev_table_id).await { + warn!("failed to drop previous table {}: {}", prev_table_id, err); + } else { + info!("successfully cleaned up previous table {}", prev_table_id); + } + }); } } diff --git a/etl-destinations/tests/common/bigquery.rs b/etl-destinations/tests/common/bigquery.rs index 5ea835825..d0670998f 100644 --- a/etl-destinations/tests/common/bigquery.rs +++ b/etl-destinations/tests/common/bigquery.rs @@ -34,6 +34,29 @@ fn random_dataset_id() -> String { /// /// Provides a unified interface for BigQuery operations in tests, automatically /// handling setup and teardown of test datasets using actual Google Cloud credentials. +/// +/// # Table Creation Examples +/// +/// ```rust +/// // Create a simple table with column definitions +/// let table_name = TableName::new("public".to_string(), "users".to_string()); +/// let columns = &[ +/// ("id", "INT64"), +/// ("name", "STRING"), +/// ("age", "INT64"), +/// ("created_at", "TIMESTAMP") +/// ]; +/// db.create_table(table_name, columns).await; +/// +/// // Create a table with custom DDL +/// let ddl = "CREATE TABLE `project.dataset.public_products` ( +/// id INT64, +/// name STRING, +/// price NUMERIC(10,2), +/// tags ARRAY +/// )"; +/// db.create_table_with_ddl(table_name, ddl).await; +/// ``` pub struct BigQueryDatabase { client: Option, sa_key_path: String, @@ -115,6 +138,35 @@ impl BigQueryDatabase { .rows } + /// Manually creates a table in the test dataset using column definitions. + /// + /// Creates a table by generating a DDL statement from the provided column specifications. + /// Each column is specified as a tuple of (column_name, bigquery_type). + pub async fn create_table(&self, table_id: &str, columns: &[(&str, &str)]) { + let client = self.client().unwrap(); + let project_id = self.project_id(); + let dataset_id = self.dataset_id(); + + let column_definitions: Vec = columns + .iter() + .map(|(name, data_type)| format!("{} {}", name, data_type)) + .collect(); + + let ddl = format!( + "CREATE TABLE `{}.{}.{}` ({})", + project_id, + dataset_id, + table_id, + column_definitions.join(", ") + ); + + client + .job() + .query(project_id, QueryRequest::new(ddl)) + .await + .unwrap(); + } + /// Returns the Google Cloud project ID for this database instance. fn project_id(&self) -> &str { &self.project_id diff --git a/etl-destinations/tests/integration/bigquery_test.rs b/etl-destinations/tests/integration/bigquery_test.rs index 3d5dd6213..a1f6fdd6f 100644 --- a/etl-destinations/tests/integration/bigquery_test.rs +++ b/etl-destinations/tests/integration/bigquery_test.rs @@ -368,10 +368,7 @@ async fn table_subsequent_updates() { assert_eq!(parsed_users_rows, vec![BigQueryUser::new(1, "user_2", 2),]); } -// This test is disabled since truncation is currently not supported by BigQuery when doing CDC -// streaming. The test is kept just for future use. #[tokio::test(flavor = "multi_thread")] -#[ignore] async fn table_truncate_with_batching() { init_test_tracing(); install_crypto_provider_for_bigquery(); @@ -381,6 +378,12 @@ async fn table_truncate_with_batching() { let bigquery_database = setup_bigquery_connection().await; + // We create table `test_users_1` to simulate an error in the system where a table with that name + // already exists and should be replaced for replication to work correctly. + bigquery_database + .create_table("test_users_1", &[("age", "integer")]) + .await; + let store = NotifyingStore::new(); let raw_destination = bigquery_database.build_destination(store.clone()).await; let destination = TestDestinationWrapper::wrap(raw_destination); @@ -420,17 +423,17 @@ async fn table_truncate_with_batching() { users_state_notify.notified().await; orders_state_notify.notified().await; - // Wait for the 4 inserts (2 per table) and 2 truncates (one per table). + // Wait for the 8 inserts (4 per table + 4 after truncate) and 2 truncates (1 per table). let event_notify = destination - .wait_for_events_count(vec![(EventType::Insert, 4), (EventType::Truncate, 2)]) + .wait_for_events_count(vec![(EventType::Insert, 8), (EventType::Truncate, 2)]) .await; - // Insert 1 row per each table. + // Insert 2 rows per each table. insert_mock_data( &mut database, &database_schema.users_schema().name, &database_schema.orders_schema().name, - 1..=1, + 1..=2, false, ) .await; @@ -445,12 +448,12 @@ async fn table_truncate_with_batching() { .await .unwrap(); - // Insert 1 extra row per each table. + // Insert 2 extra rows per each table. insert_mock_data( &mut database, &database_schema.users_schema().name, &database_schema.orders_schema().name, - 2..=2, + 3..=4, false, ) .await; @@ -459,14 +462,20 @@ async fn table_truncate_with_batching() { pipeline.shutdown_and_wait().await.unwrap(); - // We query BigQuery directly to get the data which has been inserted by tests expecting that + // We query BigQuery directly to get the data which tests have inserted, expecting that // only the rows after truncation are there. let users_rows = bigquery_database .query_table(database_schema.users_schema().name) .await .unwrap(); let parsed_users_rows = parse_bigquery_table_rows::(users_rows); - assert_eq!(parsed_users_rows, vec![BigQueryUser::new(2, "user_2", 2),]); + assert_eq!( + parsed_users_rows, + vec![ + BigQueryUser::new(3, "user_3", 3), + BigQueryUser::new(4, "user_4", 4), + ] + ); let orders_rows = bigquery_database .query_table(database_schema.orders_schema().name) .await @@ -474,7 +483,10 @@ async fn table_truncate_with_batching() { let parsed_orders_rows = parse_bigquery_table_rows::(orders_rows); assert_eq!( parsed_orders_rows, - vec![BigQueryOrder::new(2, "description_2"),] + vec![ + BigQueryOrder::new(3, "description_3"), + BigQueryOrder::new(4, "description_4") + ] ); }