In [1]:
#install packages
! pip3 install --upgrade google-cloud-aiplatform \
                                 google-cloud-bigquery 

Collecting google-cloud-bigquery
  Obtaining dependency information for google-cloud-bigquery from https://files.pythonhosted.org/packages/cc/6a/d0ef792288f2fa2cfea80899a82de302b3332dfda41984fe114e2cfbf700/google_cloud_bigquery-3.11.4-py2.py3-none-any.whl.metadata
  Using cached google_cloud_bigquery-3.11.4-py2.py3-none-any.whl.metadata (8.5 kB)
[31mERROR: Could not install packages due to an OSError: [Errno 2] No such file or directory: '/opt/conda/lib/python3.10/site-packages/google_auth-2.22.0.dist-info/METADATA'
[0m[31m
[0m

In [2]:
#list config details
!gcloud config list

[core]
account = 906300363160-compute@developer.gserviceaccount.com
disable_usage_reporting = True
project = lucid-forklift-399118

Your active configuration is: [default]


In [3]:
#list project details
!gcloud projects list

PROJECT_ID             NAME  PROJECT_NUMBER
lucid-forklift-399118  demo  906300363160


In [4]:
# set project id
PROJECT_ID = "lucid-forklift-399118"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [5]:
#set region
REGION = "us-central1"  # @param {type: "string"}

In [6]:
#Create a storage bucket to store intermediate artifacts such as datasets
BUCKET_URI = f"gs://demo-vertex-{PROJECT_ID}-test"  # @param {type:"string"}

In [7]:
# Run the following cell to create your Cloud Storage bucket.
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://demo-vertex-lucid-forklift-399118-test/...
ServiceException: 409 A Cloud Storage bucket named 'demo-vertex-lucid-forklift-399118-test' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [8]:
# set service account
SERVICE_ACCOUNT = "906300363160-compute@developer.gserviceaccount.com"  # @param {type:"string"}

In [9]:
import json
import os

from pathlib import Path as path
from urllib.parse import urlparse

import google.cloud.aiplatform as vertex_ai
from kfp import dsl
from kfp import compiler
#from kfp.dsl import component

In [10]:
# initialize the vertex ai sdk
vertex_ai.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

In [11]:
# Grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

No changes made to gs://demo-vertex-lucid-forklift-399118-test/
No changes made to gs://demo-vertex-lucid-forklift-399118-test/


In [12]:
# setup project template folders
DATA_PATH = "data"
KFP_COMPONENTS_PATH = "components"
SRC = "src"
BUILD = "build"

!mkdir -m 777 -p {DATA_PATH} {KFP_COMPONENTS_PATH} {SRC} {BUILD}

In [13]:
#setup pipeline constants
JOB_NAME = f"merge-table-data"
SETUP_FILE_URI = urlparse(BUCKET_URI)._replace(path="setup.py").geturl()
INPUTS_URI = urlparse(BUCKET_URI)._replace(path=f"{DATA_PATH}/raw/*.sgm").geturl()
BQ_CHURN_DATASET = "CHURN"
BQ_CHURN_TABLE = "CHURN_HISTORY_RAW"
BQ_CHURN_VIEW = "CHURN_CURRENT_RAW_VIEW"

In [14]:
!mkdir -m 777 -p {KFP_COMPONENTS_PATH}/bq_merge_table_component

In [15]:
# define bigquery SQL operation
create_bq_merge_query = f"""
MERGE {BQ_CHURN_DATASET}.{BQ_CHURN_TABLE} H
USING {BQ_CHURN_DATASET}.{BQ_CHURN_VIEW}  C
ON H.Customer_ID = C.Customer_ID
WHEN MATCHED THEN
  UPDATE SET H.last_modified_date = CURRENT_DATE,H.srvc_prov_state_cd_ab_ind_current_month = C.srvc_prov_state_cd_ab_ind_current_month
WHEN NOT MATCHED THEN
  INSERT (Customer_ID, churn,rec_create_date) VALUES(Customer_ID, churn,rec_create_date);
"""

with open(
    f"{KFP_COMPONENTS_PATH}/bq_dataset_component/create_bq_merge.sql", "w"
) as q:
    q.write(create_bq_merge_query)
q.close()

In [17]:
# set pipeline configs
JOB_CONFIG = {
    "destinationTable": {
        "projectId": PROJECT_ID,
        "datasetId": BQ_CHURN_DATASET,
        "tableId": BQ_CHURN_TABLE,
        "viewId": BQ_CHURN_VIEW
    }
}

In [18]:
# create pipeline
@dsl.pipeline(
    name="run churn table update",
    description="A batch pipeline to update the existing history table with currrent data",
)
def pipeline(
    create_bq_merge_query: str,
    job_name: str,
    inputs_uri: str,
    bq_dataset: str,
    bq_table: str,
    bq_view: str,
    requirements_file_path: str,
    python_file_path: str,
    setup_file_uri: str,
    temp_location: str,
    job_config: dict,
    project: str = PROJECT_ID,
    region: str = REGION,
):

    from google_cloud_pipeline_components.v1.bigquery import (
        BigqueryCreateModelJobOp, BigqueryEvaluateModelJobOp,
        BigqueryPredictModelJobOp, BigqueryQueryJobOp)

    # create the dataset
    bq_dataset_op = BigqueryQueryJobOp(
        query=create_bq_merge_query,
        project=project,
        location="US",
    )

In [19]:
## compile and run pipeline
PIPELINE_ROOT = urlparse(BUCKET_URI)._replace(path="pipeline_root").geturl()
PIPELINE_PACKAGE = str(path(BUILD) / "mlops_bqml_text_analyisis_pipeline.json")
REQUIREMENTS_URI = urlparse(BUCKET_URI)._replace(path="requirements.txt").geturl()
PYTHON_FILE_URI = urlparse(BUCKET_URI)._replace(path="src/ingest_pipeline.py").geturl()
compiler.Compiler().compile(pipeline_func=pipeline, package_path=PIPELINE_PACKAGE)

In [22]:
pipeline = vertex_ai.PipelineJob(
    display_name=f"data_preprocess",
    template_path=PIPELINE_PACKAGE,
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "create_bq_merge_query": create_bq_merge_query,
        "bq_dataset": BQ_CHURN_DATASET,
        "job_name": JOB_NAME,
        "bq_table": BQ_CHURN_TABLE,
        "bq_view": BQ_CHURN_VIEW,
        "requirements_file_path": REQUIREMENTS_URI,
        "python_file_path": PYTHON_FILE_URI,
        "temp_location": PIPELINE_ROOT,
        "job_config": JOB_CONFIG,
        "setup_file_uri": SETUP_FILE_URI,
        "inputs_uri": INPUTS_URI
    },
    enable_caching=False,
)

pipeline.run(service_account=SERVICE_ACCOUNT)

Creating PipelineJob
PipelineJob created. Resource name: projects/906300363160/locations/us-central1/pipelineJobs/run-churn-table-update-20230916222637
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/906300363160/locations/us-central1/pipelineJobs/run-churn-table-update-20230916222637')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/run-churn-table-update-20230916222637?project=906300363160
PipelineJob projects/906300363160/locations/us-central1/pipelineJobs/run-churn-table-update-20230916222637 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/906300363160/locations/us-central1/pipelineJobs/run-churn-table-update-20230916222637 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/906300363160/locations/us-central1/pipelineJobs/run-churn-table-update-20230916222637 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob run completed. Re

In [23]:
# delete pipeline and temp GCS buckets
#pipeline.delete()

delete_bucket = True

# delete bucket
if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil -m rm -r $BUCKET_URI

Deleting PipelineJob : projects/906300363160/locations/us-central1/pipelineJobs/run-churn-table-update-20230916222637
Delete PipelineJob  backing LRO: projects/906300363160/locations/us-central1/operations/3914729443238608896
PipelineJob deleted. . Resource name: projects/906300363160/locations/us-central1/pipelineJobs/run-churn-table-update-20230916222637
Removing gs://demo-vertex-lucid-forklift-399118-test/pipeline_root/#1694903213480800...
Removing gs://demo-vertex-lucid-forklift-399118-test/pipeline_root/906300363160/#1694903213765856...
Removing gs://demo-vertex-lucid-forklift-399118-test/pipeline_root/906300363160/run-churn-table-update-20230916222637/bigquery-query-job_3184807585303756800/#1694903214371664...
Removing gs://demo-vertex-lucid-forklift-399118-test/pipeline_root/906300363160/run-churn-table-update-20230916222637/#1694903214082635...
Removing gs://demo-vertex-lucid-forklift-399118-test/pipeline_root/906300363160/run-churn-table-update-20230916222637/bigquery-query-jo

In [29]:
#! gcloud services enable logging.googleapis.com