Skip to content

Commit

Permalink
feat: Add support for table_create_disposition in bigquery job for …
Browse files Browse the repository at this point in the history
…offline store (feast-dev#3762)

* Add bigquery table create disposition to offline store

Signed-off-by: Nick Zeolla <nick.zeolla@starlingbank.com>

* linting

Signed-off-by: Nick Zeolla <nick.zeolla@starlingbank.com>

---------

Signed-off-by: Nick Zeolla <nick.zeolla@starlingbank.com>
  • Loading branch information
nickozilla committed Sep 13, 2023
1 parent fa600fe commit 6a728fe
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import StrictStr, validator
from pydantic import ConstrainedStr, StrictStr, validator
from pydantic.typing import Literal
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

Expand Down Expand Up @@ -72,6 +72,13 @@ def get_http_client_info():
return http_client_info.ClientInfo(user_agent=get_user_agent())


class BigQueryTableCreateDisposition(ConstrainedStr):
"""Custom constraint for table_create_disposition. To understand more, see:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition"""

values = {"CREATE_NEVER", "CREATE_IF_NEEDED"}


class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
"""Offline store config for GCP BigQuery"""

Expand All @@ -95,6 +102,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""

table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""

@validator("billing_project_id")
def project_id_exists(cls, v, values, **kwargs):
if v and not values["project_id"]:
Expand Down Expand Up @@ -324,6 +334,7 @@ def write_logged_features(
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
schema=arrow_schema_to_bq_schema(source.get_schema(registry)),
create_disposition=config.offline_store.table_create_disposition,
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field=source.get_log_timestamp_column(),
Expand Down Expand Up @@ -384,6 +395,7 @@ def offline_write_batch(
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
schema=arrow_schema_to_bq_schema(pa_schema),
create_disposition=config.offline_store.table_create_disposition,
write_disposition="WRITE_APPEND", # Default but included for clarity
)

Expand Down

0 comments on commit 6a728fe

Please sign in to comment.