## Initialization

In [1]:
%%capture
!pip install --user kfp #--user
!pip install --user google-cloud-pipeline-components #--user

In [2]:
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath

import google.cloud.aiplatform as aip
from google_cloud_pipeline_components import aiplatform as gcc_aip
import json

from datetime import datetime

In [3]:
PROJECT_ID = '<>project id'
BUCKET_NAME="gs://" + PROJECT_ID + "demo-pipeline-bucket"
BUCKET_NAME="gs://" + '<bucket>' + '/custom_pipeline'
REGION="us-central1"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"

## Dataset Preparation Component

In [4]:
## Download BigQuery data and convert to CSV

@component(
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow", "db-dtypes"],
    base_image="python:3.9",
    output_component_file="create_dataset.yaml"
)
def get_dataframe(
    bq_table: str, project: str,
    output_data_path: OutputPath("Dataset")
):
    from google.cloud import bigquery
    import pandas as pd
    #query = """select * from """ + bq_table
    bqclient = bigquery.Client(project=project)
    #job = bqclient.query(query)
    #dataframe= job.to_dataframe()
    table = bigquery.TableReference.from_string(bq_table)
    rows = bqclient.list_rows(table)
    dataframe = rows.to_dataframe(create_bqstorage_client=True,)
    dataframe.to_csv(output_data_path)
    

## Model Training & Evaluation Component

In [5]:
## Train a XGBoost model

@component(
    packages_to_install=["joblib", "imblearn", "pyarrow==5.0", "google-auth-oauthlib==0.4.1", "dill==0.3.1.1", "httplib2==0.19"],
    base_image="us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-8:latest",
    output_component_file="train_model_component.yaml",
)
def xgb_train(
    dataset: Input[Dataset],
    metrics: Output[Metrics],
    model: Output[Model]
):
    from joblib import dump
    from sklearn.metrics import roc_curve, confusion_matrix, precision_recall_curve, accuracy_score, precision_score, recall_score, f1_score, classification_report
    from sklearn.model_selection import train_test_split
    import xgboost as xgb
    from imblearn.over_sampling import RandomOverSampler
    import pandas as pd
    import numpy as np
    
    print("xgb_train start running...")
    df = pd.read_csv(dataset.path)
    df["category"] = df["category"].astype("category")
    X_train, X_test, y_train, y_test = train_test_split(
        df[['card_transactions_amount','card_transactions_transaction_distance','card_transactions_transaction_hour_of_day','category']], 
        df.is_fraud, test_size=.2, random_state=0)
    ros = RandomOverSampler(sampling_strategy='minority', random_state=999)
    X_res, y_res = ros.fit_resample(X_train, y_train)
    
    X_res["category"] = X_res["category"].astype("category")
    
    print("Define XGBoost Model started...")
    xgb_model = xgb.XGBClassifier(
    #n_estimators=100,
    eval_metric="aucpr", 
    learning_rate=0.123,
    max_depth=10,
    enable_categorical=True,
    use_label_encoder=False,
    tree_method='gpu_hist',
    random_state=0)
    
    print("Define XGBoost Model completed...")
    print("model fit checkpoint started...")
    
    xgb_model.fit(X_res, y_res)
    
    print("model fit checkpoint completed...")
    print("metrics calaculation started...")
    
    score = xgb_model.score(X_test, y_test)
    ros_predicted = xgb_model.predict(X_test)
    confusion = confusion_matrix(y_test, ros_predicted)
    precision_score = precision_score(y_test, ros_predicted)
    recall_score = recall_score(y_test, ros_predicted)
    F1 = f1_score(y_test, ros_predicted)
    report = classification_report(y_test, ros_predicted, target_names=['not Fraud', 'Fraud'])
    print("metrics calaculation completed...")
    print("metrics logging started...")
    metrics.log_metric("framework", xgb.__version__)
    metrics.log_metric("accuracy",score)
    #metrics.log_metric("confusion matrix", confusion)
    metrics.log_metric("precision", precision_score)
    metrics.log_metric("recall", recall_score)
    metrics.log_metric("F1", F1)
    #metrics.log_metric("classification report", report)
    metrics.log_metric("SMOTE_dataset_size", len(df))
    print("metrics logging completed...")
    print(model.path)
    print(type(metrics))
    dump(xgb_model, model.path + ".joblib")
    print("model uploaded and xgb_train completed...")

## Upload Model Component 

In [6]:
## Upload model to model registry

@component(
    packages_to_install=["google-cloud-aiplatform"],
    base_image="python:3.9",
    output_component_file="model_upload_component.yaml",
)
def upload_model(
    model: Input[Model],
    project: str,
    region: str,
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    
    aiplatform.init(project=project, location=region)
    
    uploaded_model = aiplatform.Model.upload(
        display_name="xgb-model-pipeline",
        artifact_uri = model.uri.replace("model", ""),
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-5:latest"
    )
    
    vertex_model.uri = uploaded_model.resource_name
    


## Kubeflow Pipeline 

In [7]:
## Pipeline

@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="demo-pipeline",
)
def pipeline(
    bq_table: str = "",
    output_data_path: str = "new_train.csv",
    project: str = PROJECT_ID,
    region: str = REGION
):
    dataset_task = get_dataframe(bq_table,project)

    model_task = xgb_train(
        dataset_task.output
    ).add_node_selector_constraint('cloud.google.com/gke-accelerator', 'NVIDIA_TESLA_T4')

    upload_task = upload_model(
        model=model_task.outputs["model"],
        project=project,
        region=region
    )

In [8]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="demo_pipeline.json"
)



In [9]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

demo = aip.PipelineJob(
    display_name="demo-pipeline",
    template_path="demo_pipeline.json",
    job_id="demo-pipeline-{0}".format(TIMESTAMP),
    parameter_values={"bq_table": "<BQ_TABLE_ID>"},
    enable_caching=True,
)

In [10]:
service_account='''vertex-managed-notebook@<project_id>.iam.gserviceaccount.com'''
demo.submit(service_account=service_account)

Creating PipelineJob
PipelineJob created. Resource name: projects/16838415269/locations/us-central1/pipelineJobs/demo-pipeline-20220809103627
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/16838415269/locations/us-central1/pipelineJobs/demo-pipeline-20220809103627')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/demo-pipeline-20220809103627?project=16838415269
