<a href="https://colab.research.google.com/github/rastringer/blog_notebooks/blob/main/bq_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BigQuery via Vertex Pipelines

In this notebook, we will look at performing BigQuery operations within components of a Vertex Pipeline.

We will use both [Google Cloud Components](https://cloud.google.com/vertex-ai/docs/pipelines/gcpc-list#bqml_components) and [lightweight Python](https://www.kubeflow.org/docs/components/pipelines/v2/components/lightweight-python-components/) components.

In [None]:
# Install libraries
!pip install --upgrade kfp google-cloud-aiplatform google-cloud-bigquery google-cloud-pipeline-components

In [None]:
# Authenticate via Colab (if on Vertex Workbench, you're already authenticated)
from google.colab import auth
auth.authenticate_user()

### The 'location' gotcha

It is common to list project locations such as `us-central1` and `europe-west4` when using Vertex AI. BigQuery datasets are less granular in their location specification, please check the BQ console if in doubt. For example, below we have `US` rather than `us-central1`. BigQuery will not find the dataset if this is incorrect.

In [None]:
# Environment variables
PROJECT_ID="<your-project-id>"
# Make sure the add the BQ dataset location, vs the project location
LOCATION="US"
DATASET="<your-bq-dataset-name>"
BQ_DATASET=f"{PROJECT_ID}.{DATASET}"
TABLE_A=f"{BQ_DATASET}.table_a"
TABLE_B=f"{BQ_DATASET}.table_b"
BUCKET_URI="gs://<your-bucket-name>"
PIPELINE_ROOT=BUCKET_URI

### Quick check on BigQuery tables

This [colab](https://colab.sandbox.google.com/notebooks/bigquery.ipynb#scrollTo=LMNA-vBHPyHz) is an excellent resource for examples of how to interact with BigQuery from a colab.

In this example, we start with a simple table, `table_a`, filter `table_a` to create `table_b`, then create `view_c` from `table_b`, storing the view in Cloud Storage.

### Create the table

Uncomment and run the cell below if not done already

In [None]:
%%bigquery --project <your-project-id>

# CREATE TABLE <your-project-id>.<your-dataset>.table_a (
#     id INT64,
#     x INT64,
#     y INT64,
#     PARTITIONDATE DATE
# );

# INSERT INTO <your-project-id>.<your-dataset>.table_a (id, x, y, PARTITIONDATE)
# VALUES
#     (0, 2, 3, '2023-12-12'),
#     (1, 1, 2, '2023-12-12'),
#     (2, 3, 4, '2023-12-13'),
#     (3, 4, 5, '2024-01-15');


### Vertex Pipelines

We initialize the Vertex SDK and do the rest from inside a pipeline.

In [None]:
from google.cloud import aiplatform as vertex_ai

vertex_ai.init(project=PROJECT_ID, location=LOCATION, staging_bucket=BUCKET_URI)

Below is a custom lightweight component. This means we have to be specific about what the compoent does. Jump ahead first to see the GCP components designed specifically for BigQuery before having a look at this one.

Here we leverage the kfp `dsl.component` decorator to make a lightweight Python component that includes the relevant libraries for BigQuery and Cloud Storage.

In [None]:
@dsl.component(
   packages_to_install=["google-cloud-bigquery", "google-cloud-storage"],
   base_image="python:3.9",
)

def write_to_gcs(
    view_name: str,
    dataset: str,
    project: str,
    bucket: str,
    blob_path: str,
    output_gcs_uri: dsl.OutputPath(str)
):
   # Import libraries from inside the component
   from google.cloud import bigquery
   from google.cloud import storage

   # Set up the BigQuery client and initialize to
   # the correct dataset
   client = bigquery.Client(project=project)
   dataset = client.dataset(dataset)
   view = dataset.table(view_name)

   # Save view to Cloud Storage
   def save_view(bucket_name, blob_path):
       storage_client = storage.Client()
       bucket = storage_client.get_bucket(bucket)
       blob = bucket.get_blob(blob_path)
       content = blob.download_as_string()
       # Resulting URI in component output
       storage_uri = f"gs://{bucket}/{blob_path}"
       output_gcs_uri.set(storage_uri)

The `create_table...` and `create_view...` components below use the `BigqueryQueryJobOp` Google Cloud Pipeline Components.

Here is the specification:

```
v1.bigquery.BigqueryQueryJobOp(
  destination_table: dsl.Output[google.BQTable],
  gcp_resources: dsl.OutputPath(str),
  query: str = '',
  location: str = 'us-central1',
  query_parameters: list[str] = [],
  job_configuration_query: dict[str, str] = {},
  labels: dict[str, str] = {},
  encryption_spec_key_name: str = '',
  project: str = '{{$.pipeline_google_cloud_project_id}}')
```

Most of the parameters above are optional. In our example below, we will keep to `query`, `project` and `location`.

The component automatically returns:

```
destination_table: dsl.Output[google.BQTable]
```
This is table where the query results should be stored. This property must be set for large results that exceed the maximum response size. For queries that produce anonymous (cached) results, this field will be populated by BigQuery.

```
gcp_resources: dsl.OutputPath(str)
```
Serialized gcp_resources proto tracking the BigQuery job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.

### Set up the query

In [None]:
# Here's our SQL query to filter results from table_a to table_b

query_a = f"""
CREATE OR REPLACE TABLE {TABLE_B}
AS SELECT id, x * y as xy FROM {TABLE_A}
WHERE PARTITIONDATE = DATE(2023, 12, 12)
"""

In [None]:
from kfp.v2 import dsl

@dsl.pipeline(
    name="bq-example",
    description="Showing bq components",
)
def pipeline(
    query: str = query_a,
    project: str = PROJECT_ID,
    region: str = LOCATION,
    staging_dir: str = BUCKET_URI,

):
    # Import the libraries from inside the pipeline
    from google_cloud_pipeline_components.types import artifact_types
    from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp

    create_table_b_op = BigqueryQueryJobOp(
        query=query_a,
        project=PROJECT_ID,
        location="US",
    )

    create_view_c_op = BigqueryQueryJobOp(
        query=query_c,
        project=PROJECT_ID,
        location=LOCATION,
    ).after(create_table_b_op)

    save_view_op = write_to_gcs(
        dataset=BQ_DATASET,
        view_name=f"{BQ_DATASET}.view_c",
        project=PROJECT_ID,
        bucket=BUCKET_URI,
        blob_path=f"{BUCKET_URI}/view_c.csv",
    ).after(create_view_c_op)

In [None]:
from kfp.v2 import compiler

compiler.Compiler().compile(pipeline_func=pipeline, package_path="bq.json")

Let's take a quick look at the .json file with the pipeline parameters

In [None]:
!cat bq.json

In [None]:
# Create a Vertex AI Pipeline job
job = vertex_ai.PipelineJob(
    display_name='bq-example',
    template_path='bq.json',
    enable_caching=True,
)
# Run the pipeline job
job.run()

<img src="https://github.com/rastringer/blog_notebooks/blob/main/images/bq_pipeline.png?raw=true" width="800"/>

We should now see the `.csv` file in GCS with the `view_c` results.

This can be the start of an end-to-end workflow, proceeding to model training, evaluation and deployment. For a more extensive example, please see this [notebook](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/ml_ops/stage3/get_started_with_bqml_pipeline_components.ipynb) from Google.