# MLOps with Spark MLLib & Vertex AI Pipelines
In the prior lab modules - <br>(1) We authored Spark code in Serverless Spark interactive notebooks in Vertex AI workbench. <br>(2) We then created PySpark scripts off of them and tested them manually on cloud shell. <br> In this notebook -<br> (3) We will create a Vertex AI pipeline for MLOps - essentially chaining together the serverless Spark applications we developed in (2)<br> In a subsequent modules -<br>(4) We will create a Google Cloud Function to execute the pipeline and <br> (5) Finally, we will call that Google Cloud Function via Cloud Scheduler to complete the automation.
We will compile the pipeline JSON and use the same to schedule with Cloud Scheduler separately.

Dependency: Custom container image for Serverless Spark (already) created as part of (your) Terraform-based environment provisioning if you have created your environment using the Terraform provided in this lab.

### 1. One time setup of dependencies
Uncomment the cell below, and run just the cell, one time ONLY to install necessary libraries - AND THEN comment it back.

In [None]:
"""
!pip3 install --user --upgrade google-cloud-aiplatform==1.11.0 kfp==1.8.11 google-cloud-pipeline-components==1.0.1 --quiet --no-warn-conflicts

# Automatically restart kernel after installs
import os
if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)
"""

### 2. Variables definition

In [1]:
import random
from pathlib import Path as path
from typing import NamedTuple
import os

from google.cloud import aiplatform as vertex_ai
# from google_cloud_pipeline_components import aiplatform as vertex_ai_components
# from kfp.v2 import compiler, dsl
# from kfp.v2.dsl import (Artifact, ClassificationMetrics, Condition, Input,
#                         Metrics, Output, component)

In [2]:
from kfp import compiler, dsl
from kfp.dsl import (Artifact, ClassificationMetrics, Condition, Input,Metrics, Output, component)
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,ModelDeployOp)
from google_cloud_pipeline_components.v1.model import ModelUploadOp

#### a. Project specifics

In [3]:
import os

PROJECT_ID = "vertex-ai-382806"
PROJECT_NBR = "433578906282"
UNIQUE_ID = random.randint(1, 10000)
WITHOUT_TASK_CACHING = True
BYO_NETWORK = True

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    project_id_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = project_id_output[0]
    print("Project ID: ", PROJECT_ID)
    
    
    project_nbr_output = !gcloud projects describe $PROJECT_ID --format='value(projectNumber)'
    PROJECT_NBR = project_nbr_output[0]
    print("Project Number: ", PROJECT_NBR)
    
umsa_output = !gcloud config list account --format "value(core.account)"
UMSA_FQN = umsa_output[0]
print("UMSA FQN: ", UMSA_FQN)
print("UNIQUE ID: ", UNIQUE_ID)

!gcloud config set project $PROJECT_ID

Project ID:  vertex-ai-382806
Project Number:  433578906282
UMSA FQN:  433578906282-compute@developer.gserviceaccount.com
UNIQUE ID:  3759
Updated property [core/project].


#### b. Local resources

In [4]:
APP_BASE_NM = "customer-churn-model"

In [5]:
LOCAL_SCRATCH_DIR = path(f"/home/jupyter/scratch/{APP_BASE_NM}/")

In [6]:
!mkdir -m 777 -p $LOCAL_SCRATCH_DIR

In [7]:
!ls -al $LOCAL_SCRATCH_DIR

total 164
drwxrwxrwx 2 jupyter jupyter  4096 Jun  3 06:26 .
drwxr-xr-x 3 jupyter jupyter  4096 May 29 10:41 ..
-rw-r--r-- 1 jupyter jupyter 31298 May 29 12:47 pipeline_1257.json
-rw-r--r-- 1 jupyter jupyter 25656 Jun  1 21:37 pipeline_1736.json
-rw-r--r-- 1 jupyter jupyter 25811 Jun  2 09:04 pipeline_4032.json
-rw-r--r-- 1 jupyter jupyter 33061 Jun  3 07:08 pipeline_8452.json
-rw-r--r-- 1 jupyter jupyter 31298 Jun  1 14:48 pipeline_8899.json


In [8]:
!cd $LOCAL_SCRATCH_DIR && pwd

/home/jupyter/scratch/customer-churn-model


In [9]:
!ls -al /home/jupyter

total 104
drwxr-xr-x 14 jupyter jupyter  4096 Jun  5 13:13 .
drwxr-xr-x  3 root    root     4096 May 27 15:20 ..
-rw-------  1 jupyter jupyter  2797 Jun  3 20:20 .bash_history
-rw-r--r--  1 jupyter jupyter   431 May 27 15:21 .bashrc
drwxr-xr-x  5 jupyter jupyter  4096 May 27 15:24 .cache
drwxr-xr-x  4 jupyter jupyter  4096 May 27 15:21 .config
drwxr-xr-x  2 jupyter jupyter  4096 May 29 11:01 .docker
drwxr-xr-x  3 jupyter jupyter  4096 May 29 11:01 .gsutil
drwxr-xr-x  2 jupyter jupyter  4096 Jun  4 14:24 .ipynb_checkpoints
drwxr-xr-x  3 jupyter jupyter  4096 May 27 15:20 .ipython
drwxr-xr-x  4 jupyter jupyter  4096 May 28 07:54 .jupyter
drwxr-xr-x  2 jupyter jupyter  4096 May 28 08:52 .keras
drwxr-xr-x  5 jupyter jupyter  4096 May 27 15:34 .local
-rw-------  1 jupyter jupyter  1354 May 29 14:10 .viminfo
-rw-r--r--  1 jupyter jupyter 24494 Jun  5 13:13 RunInference_test_beam.ipynb
-rw-r--r--  1 jupyter jupyter  6264 Jun  3 12:28 beam_file.py
drwxr-xr-x  2 jupyter jupyter  4096 Jun  1 19:

#### d. The pre-created resources

In [10]:
CODE_BUCKET = f"gs://s8s_code_bucket-{PROJECT_NBR}"
DATA_BUCKET = f"gs://s8s_data_bucket-{PROJECT_NBR}"
MODEL_BUCKET = f"gs://s8s_model_bucket-{PROJECT_NBR}"
SCRATCH_BUCKET = f"s8s-spark-bucket-{PROJECT_NBR}"
BQ_DS_NM = f"{PROJECT_ID}.customer_churn_ds"
LOCATION = "us-central1"
VPC_NM = f"s8s-vpc-{PROJECT_NBR}"
SUBNET_RESOURCE_URI = f"projects/{PROJECT_ID}/regions/{LOCATION}/subnetworks/spark-snet"
PERSISTENT_SPARK_HISTORY_SERVER_RESOURCE_URI = f"projects/{PROJECT_ID}/regions/{LOCATION}/clusters/s8s-sphs-{PROJECT_NBR}"
GCR_REPO_NM = f"s8s-spark-{PROJECT_NBR}"
DOCKER_IMAGE_TAG = "1.0.0"
DOCKER_IMAGE_NM = "customer_churn_image"
DOCKER_IMAGE_FQN = f"us-central1-docker.pkg.dev/{PROJECT_ID}/s8s-spark/{DOCKER_IMAGE_NM}:{DOCKER_IMAGE_TAG}"

#### e. Pipeline entity specific

In [11]:
PIPELINE_ID = UNIQUE_ID
PIPELINE_NM = f"{APP_BASE_NM}-pipeline"
PIPELINE_PACKAGE_SRC_LOCAL_PATH = f"{LOCAL_SCRATCH_DIR}/pipeline_{PIPELINE_ID}.json"
PIPELINE_ROOT_GCS_URI = f"{MODEL_BUCKET}/{APP_BASE_NM}/pipelines"

print('PIPELINE_ID =',PIPELINE_ID)
print('PIPELINE_NM =',PIPELINE_NM)
print('PIPELINE_PACKAGE_SRC_LOCAL_PATH =',PIPELINE_PACKAGE_SRC_LOCAL_PATH)
print('PIPELINE_ROOT_GCS_URI =',PIPELINE_ROOT_GCS_URI)

PIPELINE_ID = 3759
PIPELINE_NM = customer-churn-model-pipeline
PIPELINE_PACKAGE_SRC_LOCAL_PATH = /home/jupyter/scratch/customer-churn-model/pipeline_3759.json
PIPELINE_ROOT_GCS_URI = gs://s8s_model_bucket-433578906282/customer-churn-model/pipelines


#### d. Pipeline stage agnostic

In [12]:
DATAPROC_S8S_RUNTIME="2.1"
PY_SCRIPTS_FQP = f"{CODE_BUCKET}/pyspark"
PYSPARK_COMMON_UTILS_SCRIPT_FQP = [f"{PY_SCRIPTS_FQP}/common_utils.py"]

print('PY_SCRIPTS_FQP =',PY_SCRIPTS_FQP)
print('PYSPARK_COMMON_UTILS_SCRIPT_FQP =',PYSPARK_COMMON_UTILS_SCRIPT_FQP)

PY_SCRIPTS_FQP = gs://s8s_code_bucket-433578906282/pyspark
PYSPARK_COMMON_UTILS_SCRIPT_FQP = ['gs://s8s_code_bucket-433578906282/pyspark/common_utils.py']


#### e. Data preprocessing stage specific

In [13]:
DATA_PREPROCESSING_BATCH_PREFIX = "preprocessing"
DATA_PREPROCESSING_BATCH_INSTANCE_ID = f"{APP_BASE_NM}-{DATA_PREPROCESSING_BATCH_PREFIX}-{UNIQUE_ID}"
DATA_PREPROCESSING_MAIN_PY_SCRIPT = f"{PY_SCRIPTS_FQP}/preprocessing.py"

DATA_PROCESSING_SINK = f"{BQ_DS_NM}.training_data"
DATA_PROCESSING_BQ_SINK_URI = f"bq://{DATA_PROCESSING_SINK}"

DATA_PREPROCESSING_ARGS = [f"--pipelineID={UNIQUE_ID}", \
        f"--projectID={PROJECT_ID}", \
        f"--projectNbr={PROJECT_NBR}", 
        f"--displayPrintStatements={True}"]

print('DATA_PREPROCESSING_BATCH_INSTANCE_ID =',DATA_PREPROCESSING_BATCH_INSTANCE_ID)
print('DATA_PREPROCESSING_MAIN_PY_SCRIPT =',DATA_PREPROCESSING_MAIN_PY_SCRIPT)
print('DATA_PROCESSING_SINK =',DATA_PROCESSING_SINK)
print('DATA_PROCESSING_BQ_SINK_URI =',DATA_PROCESSING_BQ_SINK_URI)
print('DATA_PREPROCESSING_ARGS =',DATA_PREPROCESSING_ARGS)

DATA_PREPROCESSING_BATCH_INSTANCE_ID = customer-churn-model-preprocessing-3759
DATA_PREPROCESSING_MAIN_PY_SCRIPT = gs://s8s_code_bucket-433578906282/pyspark/preprocessing.py
DATA_PROCESSING_SINK = vertex-ai-382806.customer_churn_ds.training_data
DATA_PROCESSING_BQ_SINK_URI = bq://vertex-ai-382806.customer_churn_ds.training_data
DATA_PREPROCESSING_ARGS = ['--pipelineID=3759', '--projectID=vertex-ai-382806', '--projectNbr=433578906282', '--displayPrintStatements=True']


#### f. Dataset registration specific

In [14]:
MANAGED_DATASET_NM = f"{APP_BASE_NM}-{UNIQUE_ID}"

#### g. Model specific

In [15]:
MODEL_TRAINING_BATCH_PREFIX = "training"
MODEL_TRAINING_BATCH_INSTANCE_ID = f"{APP_BASE_NM}-{MODEL_TRAINING_BATCH_PREFIX}-{UNIQUE_ID}"
MODEL_TRAINING_MAIN_PY_SCRIPT = f"{PY_SCRIPTS_FQP}/model_training.py"
MODEL_TRAINING_ARGS = [f"--pipelineID={UNIQUE_ID}", \
        f"--projectID={PROJECT_ID}", \
        f"--projectNbr={PROJECT_NBR}", 
        f"--displayPrintStatements={True}"]

MODEL_METRICS_BUCKET_FQP = f"gs://s8s_metrics_bucket-{PROJECT_NBR}/{APP_BASE_NM}/{MODEL_TRAINING_BATCH_PREFIX}/{UNIQUE_ID}/full/metrics.json"

print('MODEL_TRAINING_BATCH_INSTANCE_ID =',MODEL_TRAINING_BATCH_INSTANCE_ID)
print('MODEL_TRAINING_MAIN_PY_SCRIPT =',MODEL_TRAINING_MAIN_PY_SCRIPT)
print('MODEL_TRAINING_ARGS =',MODEL_TRAINING_ARGS)
print('MODEL_METRICS_BUCKET_FQP =',MODEL_METRICS_BUCKET_FQP)

MODEL_TRAINING_BATCH_INSTANCE_ID = customer-churn-model-training-3759
MODEL_TRAINING_MAIN_PY_SCRIPT = gs://s8s_code_bucket-433578906282/pyspark/model_training.py
MODEL_TRAINING_ARGS = ['--pipelineID=3759', '--projectID=vertex-ai-382806', '--projectNbr=433578906282', '--displayPrintStatements=True']
MODEL_METRICS_BUCKET_FQP = gs://s8s_metrics_bucket-433578906282/customer-churn-model/training/3759/full/metrics.json


#### h. Hyperparameter tuning specific

In [16]:
# Condition
AUPR_THRESHOLD = 0.5
AUPR_HYPERTUNE_CONDITION = "[AUPR_HYPERTUNE]"

HYPERPARAMETER_TUNING_BATCH_PREFIX = "hyperparameter-tuning"
HYPERPARAMETER_TUNING_BATCH_INSTANCE_ID = f"{APP_BASE_NM}-{HYPERPARAMETER_TUNING_BATCH_PREFIX}-{UNIQUE_ID}"
HYPERPARAMETER_TUNING_ARGS = [f"--pipelineID={UNIQUE_ID}", \
        f"--projectID={PROJECT_ID}", \
        f"--projectNbr={PROJECT_NBR}", 
        f"--displayPrintStatements={True}"]

HYPERPARAMETER_TUNING_MAIN_PY_SCRIPT = f"{PY_SCRIPTS_FQP}/hyperparameter_tuning.py"
HYPERPARAMETER_TUNING_BUCKET_FQP = f"gs://s8s_metrics_bucket-{PROJECT_NBR}/{APP_BASE_NM}/{HYPERPARAMETER_TUNING_BATCH_PREFIX}/{UNIQUE_ID}/full/metrics.json"
HYPERPARAMETER_TUNING_RUNTIME_CONFIGS = {
    "spark.jars.packages": "ml.combust.mleap:mleap-spark-base_2.12:0.20.0,ml.combust.mleap:mleap-spark_2.12:0.20.0"
}


print('HYPERPARAMETER_TUNING_BATCH_INSTANCE_ID =',HYPERPARAMETER_TUNING_BATCH_INSTANCE_ID)
print('HYPERPARAMETER_TUNING_MAIN_PY_SCRIPT =',HYPERPARAMETER_TUNING_MAIN_PY_SCRIPT)
print('HYPERPARAMETER_TUNING_ARGS =',HYPERPARAMETER_TUNING_ARGS)
print('HYPERPARAMETER_TUNING_RUNTIME_CONFIGS =',HYPERPARAMETER_TUNING_RUNTIME_CONFIGS)

HYPERPARAMETER_TUNING_BATCH_INSTANCE_ID = customer-churn-model-hyperparameter-tuning-3759
HYPERPARAMETER_TUNING_MAIN_PY_SCRIPT = gs://s8s_code_bucket-433578906282/pyspark/hyperparameter_tuning.py
HYPERPARAMETER_TUNING_ARGS = ['--pipelineID=3759', '--projectID=vertex-ai-382806', '--projectNbr=433578906282', '--displayPrintStatements=True']
HYPERPARAMETER_TUNING_RUNTIME_CONFIGS = {'spark.jars.packages': 'ml.combust.mleap:mleap-spark-base_2.12:0.20.0,ml.combust.mleap:mleap-spark_2.12:0.20.0'}


### 3. Initialize Vertex AI SDK for Python

In [18]:
vertex_ai.init(project=PROJECT_ID, location=LOCATION, staging_bucket=SCRATCH_BUCKET)

### 4. Define custom components

In [19]:
@component(
    base_image="python:3.10",
    packages_to_install=["numpy==1.24.4", "pandas==2.2.2", "scikit-learn==0.24.2"],
)
def fnEvaluateModel(
    metricsUri: str,
    metrics: Output[Metrics],
    plots: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("threshold_metric", float)]):

    import json
    import numpy as np
    from sklearn.metrics import confusion_matrix, roc_curve

    # Variables
    metricsGCSMountPath = metricsUri.replace("gs://", "/gcs/")
    labels = ["yes", "no"]

    # Helpers
    def fnCalculateROC(metrics, true, score):
        y_true_np = np.array(metrics[true])
        y_score_np = np.array(metrics[score])
        fpr, tpr, thresholds = roc_curve(
            y_true=y_true_np, y_score=y_score_np, pos_label=True
        )
        return fpr, tpr, thresholds

    def fnCalculateConfusionMatrix(metrics, true, prediction):
        y_true_np = np.array(metrics[true])
        y_pred_np = np.array(metrics[prediction])
        c_matrix = confusion_matrix(y_true_np, y_pred_np)
        return c_matrix

    # Main
    with open(metricsGCSMountPath, mode="r") as json_file:
        metricsDictionary = json.load(json_file)

    area_roc = metricsDictionary["test_area_roc"]
    area_prc = metricsDictionary["test_area_prc"]
    acc = metricsDictionary["test_accuracy"]
    f1 = metricsDictionary["test_f1"]
    prec = metricsDictionary["test_precision"]
    rec = metricsDictionary["test_recall"]

    metrics.log_metric("Test_areaUnderROC", area_roc)
    metrics.log_metric("Test_areaUnderPRC", area_prc)
    metrics.log_metric("Test_Accuracy", acc)
    metrics.log_metric("Test_f1-score", f1)
    metrics.log_metric("Test_Precision", prec)
    metrics.log_metric("Test_Recall", rec)

    fpr, tpr, thresholds = fnCalculateROC(metricsDictionary, "true", "score")
    c_matrix = fnCalculateConfusionMatrix(metricsDictionary, "true", "prediction")
    plots.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())
    plots.log_confusion_matrix(labels, c_matrix.tolist())

    componentOutputsTuple = NamedTuple(
        "Outputs",
        [
            ("threshold_metric", float),
        ],
    )
    return componentOutputsTuple(area_prc)


### 5. Define Vertex AI Pipeline

##### Option 1: In this version we dont disable task level caching 

In [20]:
@dsl.pipeline(
    name=PIPELINE_NM, 
    description="A MLOps Vertex pipeline")
def fnSparkMlopsPipeline(
    project_id: str = PROJECT_ID,
    location: str = LOCATION,
    service_account: str = UMSA_FQN,
    subnetwork_uri: str = SUBNET_RESOURCE_URI,
    spark_phs_nm: str = PERSISTENT_SPARK_HISTORY_SERVER_RESOURCE_URI,
    container_image: str = DOCKER_IMAGE_FQN,
    common_utils_py_fqn: list = PYSPARK_COMMON_UTILS_SCRIPT_FQP,
    data_preprocessing_pyspark_batch_id: str = DATA_PREPROCESSING_BATCH_INSTANCE_ID,
    data_preprocessing_main_py_fqn: str = DATA_PREPROCESSING_MAIN_PY_SCRIPT,
    data_preprocessing_args: list = DATA_PREPROCESSING_ARGS,
    managed_dataset_display_nm: str = MANAGED_DATASET_NM,
    managed_dataset_src_uri: str = DATA_PROCESSING_BQ_SINK_URI,
    model_training_pyspark_batch_id: str = MODEL_TRAINING_BATCH_INSTANCE_ID,
    model_training_main_py_fqn: str = MODEL_TRAINING_MAIN_PY_SCRIPT,
    model_training_metrics_fqp: str = MODEL_METRICS_BUCKET_FQP,
    model_training_args: list = MODEL_TRAINING_ARGS,
    threshold: float = AUPR_THRESHOLD,
    hyperparameter_tuning_pyspark_batch_id: str = HYPERPARAMETER_TUNING_BATCH_INSTANCE_ID,
    hyperparameter_tuning_main_py_fqn: str = HYPERPARAMETER_TUNING_MAIN_PY_SCRIPT,
    hyperparameter_tuning_args: list = HYPERPARAMETER_TUNING_ARGS,
    hyperparameter_tuning_metrics_fqp: str = MODEL_METRICS_BUCKET_FQP,
    hyperparameter_tuning_runtime_config_properties: dict = HYPERPARAMETER_TUNING_RUNTIME_CONFIGS,
    dataproc_runtime_version: str = DATAPROC_S8S_RUNTIME
):
    from google_cloud_pipeline_components.v1.dataproc import \
        DataprocPySparkBatchOp

    # Step 1. PRE-PROCESS DATA in PREP FOR MODEL TRAINING
    # ....................................................................
    preprocessingStep = DataprocPySparkBatchOp(
        project = project_id,
        location = location,
        container_image = container_image,
        subnetwork_uri = subnetwork_uri,
        spark_history_dataproc_cluster = spark_phs_nm,
        service_account = service_account,     
        batch_id = data_preprocessing_pyspark_batch_id,
        main_python_file_uri = data_preprocessing_main_py_fqn,
        python_file_uris = common_utils_py_fqn,
        args = data_preprocessing_args,
        runtime_config_version = dataproc_runtime_version
    ).set_display_name("Preprocessing")
    
    print(preprocessingStep.outputs)
    
    from google_cloud_pipeline_components.v1.dataset import \
        TabularDatasetCreateOp
    # Step 2. REGISTER PRE-PROCESSED DATA AS MANAGED DATASET
    # ....................................................................
    
    createManagedDatasetStep = TabularDatasetCreateOp(
        display_name= managed_dataset_display_nm,
        bq_source=managed_dataset_src_uri,
        project=project_id,
        location=location,
    ).after(preprocessingStep).set_display_name("Dataset registration")
    
    print(createManagedDatasetStep.outputs)
    
    # Step 3. TRAIN MODEL
    # .................................................................... 
    trainSparkMLModelStep = DataprocPySparkBatchOp(
        project = project_id,
        location = location,
        container_image = container_image,
        subnetwork_uri = subnetwork_uri,
        spark_history_dataproc_cluster = spark_phs_nm,
        service_account = service_account,     
        batch_id = model_training_pyspark_batch_id,
        main_python_file_uri = model_training_main_py_fqn,
        python_file_uris = common_utils_py_fqn,
        args = model_training_args,
        runtime_config_properties = hyperparameter_tuning_runtime_config_properties,
        runtime_config_version = dataproc_runtime_version
    ).after(preprocessingStep).set_display_name("Model training")
    
    print(trainSparkMLModelStep.outputs)
    
    from google_cloud_pipeline_components.v1.model import ModelUploadOp
    from kfp.dsl import importer_node
    
    importer_spec = importer_node.importer(
      artifact_uri='s8s_model_bucket-433578906282/customer-churn-model',
      artifact_class=artifact_types.UnmanagedContainerModel,
      metadata={
          'containerSpec': {
              'imageUri':
                  'us-central1-docker.pkg.dev/vertex-ai-382806/churn-tabular/prediction-server:prod'
          }
      })
    
    model_task = ModelUploadOp(
        project = project_id,
        location = location,
        display_name='spark-churn-model',
        unmanaged_container_model=importer_spec.outputs["artifact"],
    ).after(trainSparkMLModelStep).set_display_name("Model Upload")


{'gcp_resources': {{channel:task=dataproc-create-pyspark-batch;name=gcp_resources;type=String;}}}
{'dataset': {{channel:task=tabular-dataset-create;name=dataset;type=google.VertexDataset@0.0.1;}}}
{'gcp_resources': {{channel:task=dataproc-create-pyspark-batch-2;name=gcp_resources;type=String;}}}


##### Option 2: In this version we disable task level caching
Prefer this for scheduled execution

In [21]:
@dsl.pipeline(
    name=PIPELINE_NM, 
    description="A SparkMLlib MLOps Vertex pipeline")
def fnSparkMlopsPipelineWithoutCaching(
    project_id: str = PROJECT_ID,
    location: str = LOCATION,
    service_account: str = UMSA_FQN,
    subnetwork_uri: str = SUBNET_RESOURCE_URI,
    spark_phs_nm: str = PERSISTENT_SPARK_HISTORY_SERVER_RESOURCE_URI,
    container_image: str = DOCKER_IMAGE_FQN,
    common_utils_py_fqn: list = PYSPARK_COMMON_UTILS_SCRIPT_FQP,
    data_preprocessing_pyspark_batch_id: str = DATA_PREPROCESSING_BATCH_INSTANCE_ID,
    data_preprocessing_main_py_fqn: str = DATA_PREPROCESSING_MAIN_PY_SCRIPT,
    data_preprocessing_args: list = DATA_PREPROCESSING_ARGS,
    managed_dataset_display_nm: str = MANAGED_DATASET_NM,
    managed_dataset_src_uri: str = DATA_PROCESSING_BQ_SINK_URI,
    model_training_pyspark_batch_id: str = MODEL_TRAINING_BATCH_INSTANCE_ID,
    model_training_main_py_fqn: str = MODEL_TRAINING_MAIN_PY_SCRIPT,
    model_training_metrics_fqp: str = MODEL_METRICS_BUCKET_FQP,
    model_training_args: list = MODEL_TRAINING_ARGS,
    threshold: float = AUPR_THRESHOLD,
    hyperparameter_tuning_pyspark_batch_id: str = HYPERPARAMETER_TUNING_BATCH_INSTANCE_ID,
    hyperparameter_tuning_main_py_fqn: str = HYPERPARAMETER_TUNING_MAIN_PY_SCRIPT,
    hyperparameter_tuning_args: list = HYPERPARAMETER_TUNING_ARGS,
    hyperparameter_tuning_metrics_fqp: str = MODEL_METRICS_BUCKET_FQP,
    hyperparameter_tuning_runtime_config_properties: dict = HYPERPARAMETER_TUNING_RUNTIME_CONFIGS,
    dataproc_runtime_version: str = DATAPROC_S8S_RUNTIME
):
    from google_cloud_pipeline_components.v1.dataproc import DataprocPySparkBatchOp

    # Step 1. PRE-PROCESS DATA in PREP FOR MODEL TRAINING
    # ....................................................................
    preprocessingStep = DataprocPySparkBatchOp(
        project = project_id,
        location = location,
        container_image = container_image,
        subnetwork_uri = subnetwork_uri,
        spark_history_dataproc_cluster = spark_phs_nm,
        service_account = service_account,     
        batch_id = data_preprocessing_pyspark_batch_id,
        main_python_file_uri = data_preprocessing_main_py_fqn,
        python_file_uris = common_utils_py_fqn,
        args = data_preprocessing_args,
        runtime_config_version = dataproc_runtime_version
    ).set_caching_options(False).set_display_name("Preprocessing")
    
    from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp
    # Step 2. REGISTER PRE-PROCESSED DATA AS MANAGED DATASET
    # ....................................................................
    createManagedDatasetStep = TabularDatasetCreateOp(
        display_name= managed_dataset_display_nm,
        bq_source=managed_dataset_src_uri,
        project=project_id,
        location=location,
    ).after(preprocessingStep).set_caching_options(False).set_display_name("Dataset registration")
    
    # Step 3. TRAIN MODEL
    # .................................................................... 
    trainSparkMLModelStep = DataprocPySparkBatchOp(
        project = project_id,
        location = location,
        container_image = container_image,
        subnetwork_uri = subnetwork_uri,
        spark_history_dataproc_cluster = spark_phs_nm,
        service_account = service_account,     
        batch_id = model_training_pyspark_batch_id,
        main_python_file_uri = model_training_main_py_fqn,
        python_file_uris = common_utils_py_fqn,
        args = model_training_args,
        runtime_config_version = dataproc_runtime_version,
        runtime_config_properties = hyperparameter_tuning_runtime_config_properties,
    ).set_caching_options(False).after(preprocessingStep).set_display_name("Model training")
    
    from google_cloud_pipeline_components.v1.model import ModelUploadOp
    from kfp.dsl import importer_node

    # Step 3. UPLOAD THE MODEL
    importer_spec = importer_node.importer(
      artifact_uri='gs://s8s_model_bucket-433578906282/customer-churn-model',
      artifact_class=artifact_types.UnmanagedContainerModel,
      metadata={
          'containerSpec': {
              'imageUri':
                  'us-central1-docker.pkg.dev/vertex-ai-382806/churn-tabular/prediction-server:prod'
          }
      }).after(trainSparkMLModelStep).set_display_name("Model Upload")



### 5. Compile the Vertex AI Pipeline into a JSON

In [22]:
if WITHOUT_TASK_CACHING:
    compiler.Compiler().compile(pipeline_func=fnSparkMlopsPipelineWithoutCaching, package_path=PIPELINE_PACKAGE_SRC_LOCAL_PATH)
    print("Executing fnSparkMlopsPipelineWithoutCaching")
else:
    compiler.Compiler().compile(pipeline_func=fnSparkMlopsPipeline, package_path=PIPELINE_PACKAGE_SRC_LOCAL_PATH)
    print("Executing fnSparkMlopsPipeline")

Executing fnSparkMlopsPipelineWithoutCaching


### 6. Create the pipeline

In [23]:
pipeline = vertex_ai.PipelineJob(
    display_name=PIPELINE_NM,
    template_path=PIPELINE_PACKAGE_SRC_LOCAL_PATH,
    pipeline_root=PIPELINE_ROOT_GCS_URI,
    enable_caching=False
)

### 7. Submit the Pipeline for execution
There are two options below, one that uses the customer specified network that is peered with Vertex AI tenant network, and one that uses Vertex AI tenant network altogether

In [24]:
if BYO_NETWORK:
    pipeline.submit(service_account=UMSA_FQN, network=f"projects/{PROJECT_NBR}/global/networks/{VPC_NM}")
else:
    pipeline.submit(service_account=UMSA_FQN)

Creating PipelineJob
PipelineJob created. Resource name: projects/433578906282/locations/us-central1/pipelineJobs/customer-churn-model-pipeline-20240605131604
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/433578906282/locations/us-central1/pipelineJobs/customer-churn-model-pipeline-20240605131604')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/customer-churn-model-pipeline-20240605131604?project=433578906282


In [32]:
!pip show --version numpy

Name: numpy
Version: 1.24.4
Summary: Fundamental package for array computing in Python
Home-page: https://www.numpy.org
Author: Travis E. Oliphant et al.
Author-email: 
License: BSD-3-Clause
Location: /opt/conda/envs/tensorflow/lib/python3.10/site-packages
Requires: 
Required-by: apache-beam, contourpy, db-dtypes, explainable-ai-sdk, gymnasium, h5py, ImageHash, imageio, jax-jumpy, matplotlib, numba, opt-einsum, pandas, patsy, phik, pyarrow, PyWavelets, scikit-image, scikit-learn, scipy, seaborn, shapely, statsmodels, tensorboard, tensorboardX, tensorflow, tensorflow-datasets, tensorflow-hub, tensorflow-probability, tensorflow-transform, tifffile, visions, wordcloud, ydata-profiling


### 8. How do we automate this?

To automate this, we will take the compiled JSON, (1) test run it via Vertex AI Cloud Console UI, and then (2) schedule with Cloud Scheduler (calls Cloud Function that calls the Vertex AI REST endpoint for the Vertex AI pipeline).<br>This is a lab module that you can proceed to next.<br>
Note: The json below has your project details. The lab author has a de-identified version with placeholders for your information in the git repo ../04-templates/\*.json and as part of the Terraform automation, a customized version except custom pipeline ID is placed in 05-pipelines in your local directory in cloud shell. A copy of it is placed in GCS -> s8s-pipelines-bucket-YOUR_PROJECT_NUMBER/templates. At scheduled run time, a new custom pipeline ID is generated in the cloud function, and substituted in the json and placed in the bucket - s8s-pipelines-bucket-YOUR_PROJECT_NUMBER/execution and this is used to launch the pipeline. 

In [None]:
! cat $LOCAL_SCRATCH_DIR/pipeline_${PIPELINE_ID}.json