Skip to content

Commit b9dd668

Browse files
authored
Add more options for auth (#379)
1 parent 322d027 commit b9dd668

File tree

2 files changed

+92
-0
lines changed

2 files changed

+92
-0
lines changed

etl-destinations/src/bigquery/client.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,33 @@ impl BigQueryClient {
9898
Ok(BigQueryClient { project_id, client })
9999
}
100100

101+
/// Creates a new [`BigQueryClient`] using Application Default Credentials.
102+
///
103+
/// Authenticates with BigQuery using the environment's default credentials.
104+
/// Returns an error if credentials are missing or invalid.
105+
pub async fn new_with_adc(project_id: BigQueryProjectId) -> EtlResult<BigQueryClient> {
106+
let client = Client::from_application_default_credentials()
107+
.await
108+
.map_err(bq_error_to_etl_error)?;
109+
110+
Ok(BigQueryClient { project_id, client })
111+
}
112+
113+
/// Creates a new [`BigQueryClient`] using OAuth2 installed flow authentication.
114+
///
115+
/// Authenticates with BigQuery using the OAuth2 installed flow.
116+
pub async fn new_with_flow_authenticator<S: AsRef<[u8]>, P: Into<std::path::PathBuf>>(
117+
project_id: BigQueryProjectId,
118+
secret: S,
119+
persistant_file_path: P,
120+
) -> EtlResult<BigQueryClient> {
121+
let client = Client::from_installed_flow_authenticator(secret, persistant_file_path)
122+
.await
123+
.map_err(bq_error_to_etl_error)?;
124+
125+
Ok(BigQueryClient { project_id, client })
126+
}
127+
101128
/// Returns the fully qualified BigQuery table name.
102129
///
103130
/// Formats the table name as `project_id.dataset_id.table_id` with proper quoting.

etl-destinations/src/bigquery/core.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,71 @@ where
258258
inner: Arc::new(Mutex::new(inner)),
259259
})
260260
}
261+
/// Creates a new [`BigQueryDestination`] using Application Default Credentials (ADC).
262+
///
263+
/// Initializes the BigQuery client with the default credentials and project settings.
264+
/// The `max_staleness_mins` parameter controls table metadata cache freshness.
265+
/// The `max_concurrent_streams` parameter controls parallelism for streaming operations
266+
/// and determines how table rows are split into batches for concurrent processing.
267+
pub async fn new_with_adc(
268+
project_id: String,
269+
dataset_id: BigQueryDatasetId,
270+
max_staleness_mins: Option<u16>,
271+
max_concurrent_streams: usize,
272+
store: S,
273+
) -> EtlResult<Self> {
274+
let client = BigQueryClient::new_with_adc(project_id).await?;
275+
let inner = Inner {
276+
created_tables: HashSet::new(),
277+
created_views: HashMap::new(),
278+
};
279+
280+
Ok(Self {
281+
client,
282+
dataset_id,
283+
max_staleness_mins,
284+
max_concurrent_streams,
285+
store,
286+
inner: Arc::new(Mutex::new(inner)),
287+
})
288+
}
289+
290+
/// Creates a new [`BigQueryDestination`] using an installed flow authenticator.
291+
///
292+
/// Initializes the BigQuery client with a flow authenticator using the provided secret and persistent file path.
293+
/// The `max_staleness_mins` parameter controls table metadata cache freshness.
294+
/// The `max_concurrent_streams` parameter controls parallelism for streaming operations
295+
/// and determines how table rows are split into batches for concurrent processing.
296+
pub async fn new_with_flow_authenticator<Secret, Path>(
297+
project_id: String,
298+
dataset_id: BigQueryDatasetId,
299+
secret: Secret,
300+
persistent_file_path: Path,
301+
max_staleness_mins: Option<u16>,
302+
max_concurrent_streams: usize,
303+
store: S,
304+
) -> EtlResult<Self>
305+
where
306+
Secret: AsRef<[u8]>,
307+
Path: Into<std::path::PathBuf>,
308+
{
309+
let client =
310+
BigQueryClient::new_with_flow_authenticator(project_id, secret, persistent_file_path)
311+
.await?;
312+
let inner = Inner {
313+
created_tables: HashSet::new(),
314+
created_views: HashMap::new(),
315+
};
316+
317+
Ok(Self {
318+
client,
319+
dataset_id,
320+
max_staleness_mins,
321+
max_concurrent_streams,
322+
store,
323+
inner: Arc::new(Mutex::new(inner)),
324+
})
325+
}
261326

262327
/// Prepares a table for CDC streaming operations with schema-aware table creation.
263328
///

0 commit comments

Comments
 (0)