Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 131 additions & 1 deletion etl-destinations/src/bigquery/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl fmt::Display for BigQueryOperationType {
///
/// Provides methods for table management, data insertion, and query execution
/// against BigQuery datasets with authentication and error handling.
#[derive(Clone)]
pub struct BigQueryClient {
project_id: BigQueryProjectId,
client: Client,
Expand Down Expand Up @@ -110,6 +111,45 @@ impl BigQueryClient {
format!("`{}.{}.{}`", self.project_id, dataset_id, table_id)
}

/// Creates a table in BigQuery if it doesn't already exist, otherwise efficiently truncates
/// and recreates the table with the same schema.
///
/// This method uses BigQuery's CREATE OR REPLACE TABLE statement which is more efficient
/// than dropping and recreating as it preserves table metadata and permissions.
///
/// Returns `true` if the table was created fresh, `false` if it already existed and was replaced.
pub async fn create_or_replace_table(
&self,
dataset_id: &BigQueryDatasetId,
table_id: &BigQueryTableId,
column_schemas: &[ColumnSchema],
max_staleness_mins: Option<u16>,
) -> EtlResult<bool> {
let table_existed = self.table_exists(dataset_id, table_id).await?;

let full_table_name = self.full_table_name(dataset_id, table_id);

let columns_spec = Self::create_columns_spec(column_schemas);
let max_staleness_option = if let Some(max_staleness_mins) = max_staleness_mins {
Self::max_staleness_option(max_staleness_mins)
} else {
"".to_string()
};

info!(
"creating or replacing table {full_table_name} in BigQuery (existed: {table_existed})"
);

let query = format!(
"create or replace table {full_table_name} {columns_spec} {max_staleness_option}"
);

let _ = self.query(QueryRequest::new(query)).await?;

// Return true if it was a fresh creation, false if it was a replacement
Ok(!table_existed)
}

/// Creates a table in BigQuery if it doesn't already exist.
///
/// Returns `true` if the table was created, `false` if it already existed.
Expand Down Expand Up @@ -170,7 +210,7 @@ impl BigQueryClient {
) -> EtlResult<()> {
let full_table_name = self.full_table_name(dataset_id, table_id);

info!("Truncating table {full_table_name} in BigQuery");
info!("truncating table {full_table_name} in BigQuery");

let delete_query = format!("truncate table {full_table_name}",);

Expand All @@ -179,6 +219,48 @@ impl BigQueryClient {
Ok(())
}

/// Creates or replaces a view that points to the specified versioned table.
///
/// This is used during truncation operations to redirect the view to a new table version.
pub async fn create_or_replace_view(
&self,
dataset_id: &BigQueryDatasetId,
view_name: &BigQueryTableId,
target_table_id: &BigQueryTableId,
) -> EtlResult<()> {
let full_view_name = self.full_table_name(dataset_id, view_name);
let full_target_table_name = self.full_table_name(dataset_id, target_table_id);

info!("creating/replacing view {full_view_name} pointing to {full_target_table_name}");

let query = format!(
"create or replace view {full_view_name} as select * from {full_target_table_name}"
);

let _ = self.query(QueryRequest::new(query)).await?;

Ok(())
}

/// Drops a table from BigQuery.
///
/// Executes a DROP TABLE statement to remove the table and all its data.
pub async fn drop_table(
&self,
dataset_id: &BigQueryDatasetId,
table_id: &BigQueryTableId,
) -> EtlResult<()> {
let full_table_name = self.full_table_name(dataset_id, table_id);

info!("dropping table {full_table_name} from bigquery");

let query = format!("drop table if exists {full_table_name}");

let _ = self.query(QueryRequest::new(query)).await?;

Ok(())
}

/// Checks whether a table exists in the BigQuery dataset.
///
/// Returns `true` if the table exists, `false` otherwise.
Expand Down Expand Up @@ -244,6 +326,7 @@ impl BigQueryClient {
let append_rows_response = append_rows_response
.map_err(BQError::from)
.map_err(bq_error_to_etl_error)?;

if !append_rows_response.row_errors.is_empty() {
// We convert the error into an `ETLError`.
let row_errors = append_rows_response
Expand Down Expand Up @@ -876,4 +959,51 @@ mod tests {
let full_name = format!("`{project_id}.{dataset_id}.{table_id}`");
assert_eq!(full_name, "`test-project.test_dataset.test_table`");
}

#[test]
fn test_create_or_replace_table_query_generation() {
let project_id = "test-project";
let dataset_id = "test_dataset";
let table_id = "test_table";

let columns = vec![
ColumnSchema::new("id".to_string(), Type::INT4, -1, false, true),
ColumnSchema::new("name".to_string(), Type::TEXT, -1, true, false),
];

// Simulate the query generation logic
let full_table_name = format!("`{project_id}.{dataset_id}.{table_id}`");
let columns_spec = BigQueryClient::create_columns_spec(&columns);
let query = format!("create or replace table {full_table_name} {columns_spec}");

let expected_query = "create or replace table `test-project.test_dataset.test_table` (`id` int64 not null,`name` string, primary key (`id`) not enforced)";
assert_eq!(query, expected_query);
}

#[test]
fn test_create_or_replace_table_query_with_staleness() {
let project_id = "test-project";
let dataset_id = "test_dataset";
let table_id = "test_table";
let max_staleness_mins = 15;

let columns = vec![ColumnSchema::new(
"id".to_string(),
Type::INT4,
-1,
false,
true,
)];

// Simulate the query generation logic with staleness
let full_table_name = format!("`{project_id}.{dataset_id}.{table_id}`");
let columns_spec = BigQueryClient::create_columns_spec(&columns);
let max_staleness_option = BigQueryClient::max_staleness_option(max_staleness_mins);
let query = format!(
"create or replace table {full_table_name} {columns_spec} {max_staleness_option}"
);

let expected_query = "create or replace table `test-project.test_dataset.test_table` (`id` int64 not null, primary key (`id`) not enforced) options (max_staleness = interval 15 minute)";
assert_eq!(query, expected_query);
}
}
Loading
Loading