## Testing Vertex Pipeline for Segment 108 using Managed Dataset

In [1]:
from kfp.v2.dsl import (component, Input, Artifact, Output, ClassificationMetrics, Model, Metrics, Dataset)

### Define Model Evaluation Component

In [2]:
from typing import NamedTuple

@component(
    # base_image="gcr.io/dw-analytics-d01/propimage:0.1-pipe",
    packages_to_install=["pandas==1.1.5", "sklearn==0.24.2", "xgboost==1.5.2",
                         "google-cloud-storage", "google-cloud-bigquery",
                         "google-cloud-bigquery-storage"]
)
def evaluate_model(
    dataset: Input[Artifact],
    segment: int,
    threshold: float,
    # model: Input[Model],
    gcs_artifact_uri: str,
    metrics: Output[Metrics],
    class_metrics: Output[ClassificationMetrics],
) -> NamedTuple("output", [("passed", str)]):
    
    from proptrainer import features
    from proptrainer.model import preprocess
    from google.cloud import bigquery
    from google.cloud import storage
    from google.cloud import aiplatform
    from sklearn.metrics import confusion_matrix, roc_auc_score, roc_curve
    from sklearn.utils import compute_class_weight
    import joblib
    import pickle
    # import typing
    
    def threshold_check(value, threshold):
        condition = "false"
        if value > threshold:
            condition = "true"

        return condition
    
    class CprPredictor(object):
        def __init__(self):
            return

        def load(self, gcs_artifacts_uri: str):
            gcs_client = storage.Client()
            with open('model.joblib', 'wb') as gcs_model, open('scaler.pkl', 'wb') as gcs_scaler:
                gcs_client.download_blob_to_file(
                    f"{gcs_artifacts_uri}/model.joblib", gcs_model)
                gcs_client.download_blob_to_file(f"{gcs_artifacts_uri}/scaler.pkl",
                                                 gcs_scaler)
            with open('scaler.pkl', 'rb') as scal:
                scaler = pickle.load(scal)

            self._model = joblib.load("model.joblib")
            self._scaler = scaler

        def predict(self, instances):
            scaled_inputs = self._scaler.transform(instances)
            predictions = self._model.predict(scaled_inputs)
            return predictions
        
        def predict_proba(self, instances):
            scaled_inputs = self._scaler.transform(instances)
            probabilities = self._model.predict_proba(scaled_inputs)[:, 1]
            return probabilities
            
        
    dataset = aiplatform.TabularDataset('projects/' + 
                                        dataset.uri.split('projects/')[-1])
    table_id = dataset._gca_resource.metadata.get("inputConfig").get(
        "bigquerySource").get("uri").split('//')[-1]
    
    bqclient = bigquery.Client()
    bqstorageclient = bigquery_storage.BigQueryReadClient()
        
    query = f"""
    SELECT
    *
    FROM
    (SELECT
    * 
    FROM {table_id}
    WHERE em_segment = {segment}
    AND IN_HOME_DT >= DATE'2021-06-01')
    WHERE MOD(ABS(FARM_FINGERPRINT(CASE(COUPON_BARCODE AS STRING))), 100) < 2
    """
    
    eval_d = bqclient.query(query).result().to_dataframe(
        bqstorage_client=bqstorageclient)
    eval_d = preprocess(eval_d)
    
    columns = features.feature_lookup[str(segment)]
    inputs = eval_d[columns]
    target = eval_d["TARGET_14"]
        
    predictor = CprPredictor()
    predictor.load(gcs_artifact_uri)
    probabilities = predictor.predict_proba(inputs)
    
    # evaluate predictions
    prior = round(target.value_counts()[1] / len(target), 5)
    classes = np.unique(target)
    class_weights = dict(
        zip(
            classes,
            compute_class_weight(class_weight="balanced",
                                 classes=classes,
                                 y=target)))
    cm = confusion_matrix(target,
                          probabilities > prior,
                          labels=classes,
                          sample_weight=class_weights)
    categories = ["0", "1"]
    class_metrics.log_confusion_matrix(categories=categories, matrix=cm.tolist())
    
    fpr, tpr, thresholds = roc_curve(target, probabilities)
    class_metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())
    
    test_auc = roc_auc_score(target, probabilities)
    metrics.log_metric("auc", test_auc)
    passed = threshold_check(test_auc, threshold)
    
    return (passed,) # this would ideally be the precursor to model upload but CustomContainerTraining uploads the model already...

### Define Pipeline

In [3]:
from google_cloud_pipeline_components import aiplatform as gcc_aip
import kfp.v2 as kfp

@kfp.dsl.pipeline(
    name="prop-pipeline"
)
def pipeline(
    project: str,
    region: str,
    segment: int,
    model_display_name: str,
    args: str,
    auc_threshold: float,
    training_image_uri: str,
    base_output_dir: str,
    gcs_artifact_uri: str,
    serving_container_image_uri: str,
    # machine_type: str,
    staging_bucket: str,
):    
    # Create Vertex Datasets
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        display_name="Propensity-Training-Data",
        bq_source="bq://dw-bq-data-d00.SANDBOX_ANALYTICS.dm_pc_tiny_data", # change to larger dataset once test is complete
        project=project,
    )
    
    testset_create_op = gcc_aip.TabularDatasetCreateOp(
        display_name="Out-of-Time-Test-Data",
        bq_source="bq://dw-bq-data-d00.SANDBOX_ANALYTICS.dm_pc_refresh_eval_data_w_margin",
        project=project,
    )
    
    # testset_create_op.after(dataset_create_op)
    
    # Train the model with Custom Container
    train_op = gcc_aip.CustomContainerTrainingJobRunOp(
        display_name="prop-model-108-gcc-training",
        dataset=dataset_create_op.outputs["dataset"],
        training_fraction_split=0.79,
        validation_fraction_split=0.2,
        test_fraction_split=0.01,
        bigquery_destination="bq://dw-bq-data-d00",
        container_uri=training_image_uri,
        model_serving_container_image_uri=serving_container_image_uri,
        model_serving_container_predict_route="/predict",
        model_serving_container_health_route="/health",
        base_output_dir=base_output_dir,        
        args=args,
        project=project,
        location=region,
        staging_bucket=staging_bucket,
    )
    
    train_op.after(dataset_create_op)
    
    # Evaluate the model on out-of-time data
    eval_op = evaluate_model(
        dataset=testset_create_op.outputs["dataset"],
        segment=segment,
        threshold=auc_threshold,
        gcs_artifact_uri=gcs_artifact_uri,
    )
    
    eval_op.after(testset_create_op)
    eval_op.after(train_op)
    
    # Check condition for model upload to Vertex
    # with kfp.dsl.Condition(
    #     eval_op.outputs["upload"] == "true",
    #     name="upload_model",
    # ):
    #     model_upload_op = gcc_aip.ModelUploadOp(
    #         project=project,
    #         display_name=model_display_name,
    #         artifact_uri=gcs_artifact_uri,
    #         serving_container_image_uri=serving_container_image_uri,
    #         serving_container_health_route="/health",
    #         serving_container_predict_route="/predict",
    #     )

### Compile and run Pipeline

In [4]:
from google.cloud.aiplatform import PipelineJob
from kfp.v2 import compiler
# from kfp.v2.google.client import AIPlatformClient
from datetime import datetime

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
project = "dw-analytics-d01"
region = "us-central1"
segment = 108
bucket = "gs://ai-ml-vertex-d01"
gcs_path = "dm-propensity/pc"
auc_threshold = 0.60999
pipeline_root_path = f"{bucket}/{gcs_path}/{segment}/pipeline_root"
working_dir = f"{pipeline_root_path}/{timestamp}"
model_display_name = "prop-108-model"
path = "prop-108.json"
training_image_uri = "gcr.io/dw-analytics-d01/propimage:0.1-pipe"
serving_container_image_uri = "gcr.io/dw-analytics-d01/propimage:0.1-predict"
pipeline_display_name = "prop-108-vertex-training-pipeline"
gcs_artifact_uri = f"{working_dir}/model"
dataset = "dw-bq-data-d00.SANDBOX_ANALYTICS.dm_pc_tiny_data"
staging_bucket = bucket

# hyperparameters (args) for custom container training
max_depth = 3
min_child_weight = 3
max_delta_step = 0.5
reg_lambda = 0.15
reg_alpha = 0.3
lr = 0.3
gamma = 0.0075

CMDARGS = [
    f"--segment={segment}",
    # f"--dataset={dataset}",
    f"--max_depth={max_depth}",
    f"--min_child_weight={min_child_weight}",
    f"--max_delta_step={max_delta_step}",
    f"--reg_lambda={reg_lambda}",
    f"--reg_alpha={reg_alpha}",
    f"--gamma={gamma}",
    f"--lr={lr}",
]

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path=path,
)

job = PipelineJob(
    display_name=pipeline_display_name,
    template_path=path,
    pipeline_root=pipeline_root_path,
    location=region,
    parameter_values={
        "project": project,
        "base_output_dir": working_dir, 
        "region": region,
        "staging_bucket": staging_bucket,
        "segment": segment,
        "model_display_name": model_display_name,
        "auc_threshold": auc_threshold,
        "training_image_uri": training_image_uri,
        "gcs_artifact_uri": gcs_artifact_uri,
        "serving_container_image_uri": serving_container_image_uri,
        "args": CMDARGS,
    }
)

job.run(
    service_account="dev-ana-ainb-admin@dw-analytics-d01.iam.gserviceaccount.com"
)

Creating PipelineJob




PipelineJob created. Resource name: projects/134453458552/locations/us-central1/pipelineJobs/prop-pipeline-20220725192831
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/134453458552/locations/us-central1/pipelineJobs/prop-pipeline-20220725192831')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/prop-pipeline-20220725192831?project=134453458552
PipelineJob projects/134453458552/locations/us-central1/pipelineJobs/prop-pipeline-20220725192831 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/134453458552/locations/us-central1/pipelineJobs/prop-pipeline-20220725192831 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/134453458552/locations/us-central1/pipelineJobs/prop-pipeline-20220725192831 current state:
PipelineState.PIPELINE_STATE_RUNNING


RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [customcontainertrainingjob-run].; Job (project_id = dw-analytics-d01, job_id = 543086176353910784) is failed due to the above error.; Failed to handle the job: {project_number = 134453458552, job_id = 543086176353910784}"
