diff --git a/etl-destinations/src/bigquery/client.rs b/etl-destinations/src/bigquery/client.rs index b88b286a3..73c90fb7b 100644 --- a/etl-destinations/src/bigquery/client.rs +++ b/etl-destinations/src/bigquery/client.rs @@ -98,6 +98,33 @@ impl BigQueryClient { Ok(BigQueryClient { project_id, client }) } + /// Creates a new [`BigQueryClient`] using Application Default Credentials. + /// + /// Authenticates with BigQuery using the environment's default credentials. + /// Returns an error if credentials are missing or invalid. + pub async fn new_with_adc(project_id: BigQueryProjectId) -> EtlResult { + let client = Client::from_application_default_credentials() + .await + .map_err(bq_error_to_etl_error)?; + + Ok(BigQueryClient { project_id, client }) + } + + /// Creates a new [`BigQueryClient`] using OAuth2 installed flow authentication. + /// + /// Authenticates with BigQuery using the OAuth2 installed flow. + pub async fn new_with_flow_authenticator, P: Into>( + project_id: BigQueryProjectId, + secret: S, + persistant_file_path: P, + ) -> EtlResult { + let client = Client::from_installed_flow_authenticator(secret, persistant_file_path) + .await + .map_err(bq_error_to_etl_error)?; + + Ok(BigQueryClient { project_id, client }) + } + /// Returns the fully qualified BigQuery table name. /// /// Formats the table name as `project_id.dataset_id.table_id` with proper quoting. diff --git a/etl-destinations/src/bigquery/core.rs b/etl-destinations/src/bigquery/core.rs index 27b6fc14b..5fac9f678 100644 --- a/etl-destinations/src/bigquery/core.rs +++ b/etl-destinations/src/bigquery/core.rs @@ -258,6 +258,71 @@ where inner: Arc::new(Mutex::new(inner)), }) } + /// Creates a new [`BigQueryDestination`] using Application Default Credentials (ADC). + /// + /// Initializes the BigQuery client with the default credentials and project settings. + /// The `max_staleness_mins` parameter controls table metadata cache freshness. + /// The `max_concurrent_streams` parameter controls parallelism for streaming operations + /// and determines how table rows are split into batches for concurrent processing. + pub async fn new_with_adc( + project_id: String, + dataset_id: BigQueryDatasetId, + max_staleness_mins: Option, + max_concurrent_streams: usize, + store: S, + ) -> EtlResult { + let client = BigQueryClient::new_with_adc(project_id).await?; + let inner = Inner { + created_tables: HashSet::new(), + created_views: HashMap::new(), + }; + + Ok(Self { + client, + dataset_id, + max_staleness_mins, + max_concurrent_streams, + store, + inner: Arc::new(Mutex::new(inner)), + }) + } + + /// Creates a new [`BigQueryDestination`] using an installed flow authenticator. + /// + /// Initializes the BigQuery client with a flow authenticator using the provided secret and persistent file path. + /// The `max_staleness_mins` parameter controls table metadata cache freshness. + /// The `max_concurrent_streams` parameter controls parallelism for streaming operations + /// and determines how table rows are split into batches for concurrent processing. + pub async fn new_with_flow_authenticator( + project_id: String, + dataset_id: BigQueryDatasetId, + secret: Secret, + persistent_file_path: Path, + max_staleness_mins: Option, + max_concurrent_streams: usize, + store: S, + ) -> EtlResult + where + Secret: AsRef<[u8]>, + Path: Into, + { + let client = + BigQueryClient::new_with_flow_authenticator(project_id, secret, persistent_file_path) + .await?; + let inner = Inner { + created_tables: HashSet::new(), + created_views: HashMap::new(), + }; + + Ok(Self { + client, + dataset_id, + max_staleness_mins, + max_concurrent_streams, + store, + inner: Arc::new(Mutex::new(inner)), + }) + } /// Prepares a table for CDC streaming operations with schema-aware table creation. ///