# Create & Deploy Vertex-AI Pipeline w/ Kubeflow

Install the needed libraries in order to run the code locally

In [1]:
%%capture
!pip3 install google-cloud-aiplatform==1.0.0 --upgrade
!pip3 install kfp google-cloud-pipeline-components==0.1.1 --upgrade
!pip3 install scikit-learn
!pip3 install google-cloud-aiplatform --upgrade
!pip3 install pandas
!pip3 install python-dotenv

Might need to restart kernel after initial installation of the cell above

In [78]:
import uuid
from kfp import dsl
import kfp
from google.cloud import aiplatform
from kfp.v2.dsl import component
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, component, Metrics)
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Getting some preset environment variables save to a local file. Create one of your own by following these instructions: https://stackoverflow.com/a/54028874

In [51]:
#https://stackoverflow.com/a/54028874
%load_ext dotenv
%dotenv

import os
PROJECT_ID = os.environ['PROJECT_ID']
BUCKET_NAME = os.environ['BUCKET']

PIPELINE_ROOT = 'gs://{}/pipeline_root'.format(BUCKET_NAME)
REGION = 'us-central1'

print(PROJECT_ID)
print(BUCKET_NAME)
print(PIPELINE_ROOT)

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv
kubeflow-demos
user-group-demo
gs://user-group-demo/pipeline_root


## 1. Create a component for reading data from BQ into CSV

In [52]:
@component(packages_to_install=["pandas", "google-cloud-aiplatform", "google-cloud-bigquery-storage","google-cloud-bigquery","pyarrow"])
def preprocess(output_csv_path: OutputPath('CSV')):
    #1
    from google.cloud import bigquery
    import google.auth
    
    creds, project = google.auth.default()
    client = bigquery.Client(project='kubeflow-demos', credentials=creds)

    query =     """
            SELECT * FROM `kubeflow-demos.telco.churn`
    """
    print(query)
    
    dataframe = client.query(query).to_dataframe()
    print(dataframe.head())
    
    dataframe.to_csv(output_csv_path)
    print("done")

## 2. Create a component to train

In [188]:
@component(packages_to_install=["pandas", "imbalanced-learn", "google-cloud-aiplatform", "pyarrow"])
def train(input_csv_path: InputPath('CSV'), saved_model: Output[Model], artifact_uri: OutputPath(str), accuracy: Output[Metrics], model_type: str, project_id: str, bucket: str):
    from google.cloud import aiplatform
    from typing import NamedTuple
    #Train
    import pandas as pd
    df = pd.read_csv(input_csv_path)
    print(len(df))
    
    from sklearn.preprocessing import LabelEncoder
    for c in df.columns:
        if df[c].dtype=='object':    #Since we are encoding object datatype to integer/float
            lbl = LabelEncoder()
            lbl.fit(list(df[c].values))
            df[c] = lbl.transform(df[c].values)
    print(df.head())  #To check if properly encoded
    
    X = df[['Contract', 'tenure', 'TechSupport', 'OnlineSecurity', 'TotalCharges', 'PaperlessBilling',
       'DeviceProtection', 'Dependents', 'OnlineBackup', 'SeniorCitizen', 'MonthlyCharges',
       'PaymentMethod', 'Partner', 'PhoneService']] #taking only relevant columns
    y = df['Churn']


    # Scaling all the variables to a range of 0 to 1
    from sklearn.preprocessing import MinMaxScaler
    features = X.columns.values
    scaler = MinMaxScaler(feature_range = (0,1))
    scaler.fit(X)
    X = pd.DataFrame(scaler.transform(X))
    X.columns = features
    
    from sklearn.model_selection import train_test_split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=101)

    #Choose which model to train
    if model_type == 'linear_regression':
        from sklearn.linear_model import LogisticRegression
        model = LogisticRegression()
        
    elif model_type == 'naive_bayes':
        from sklearn.naive_bayes import GaussianNB
        model = GaussianNB()
        
    elif model_type == 'decision_tree':
        from sklearn.tree import DecisionTreeClassifier
        model = DecisionTreeClassifier()
        
    model.fit(X_train, y_train)
    
    #Save the model to disk and also automatically to GCS
    import joblib
    
    joblib.dump(model, os.path.join(saved_model.path.replace("saved_model",""), 'model.joblib'))
    print(" saved_model.path: "+ saved_model.path)
    print(" saved_model.uri: "+ saved_model.uri)
    with open(artifact_uri, 'w') as f:
        f.write(saved_model.uri.replace("saved_model",""))
    
    print(saved_model.uri)
    
    accuracy.log_metric('accuracy', 71.0)

## 3. Eval component

In [189]:
@component()
def eval(baseline: float, accuracy: Input[Metrics], accuracy2: Input[Metrics], accuracy3: Input[Metrics]) -> bool:
    isBetter = False
    
    print(str(dir(accuracy)))
    new_val = float(accuracy.metadata['accuracy'])
    print(str(new_val))
    
    
    if new_val>baseline:
        isBetter = True
    print("isBetter: "+str(isBetter))
    
    return isBetter

In [190]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [195]:
@kfp.dsl.pipeline(name="train-scikit" + str(uuid.uuid4()))
def pipeline(
    project: str = PROJECT_ID,
    bucket: str = BUCKET_NAME,
    baseline_accuracy: float = 70.0
):
    preprocess_task = preprocess()
    
    train_task = train(preprocess_task.output, model_type='linear_regression', project_id=PROJECT_ID, bucket=BUCKET_NAME)
    train_task2 = train(preprocess_task.output, model_type='naive_bayes', project_id=PROJECT_ID, bucket=BUCKET_NAME)
    train_task3 = train(preprocess_task.output, model_type='decision_tree', project_id=PROJECT_ID, bucket=BUCKET_NAME)
    
    eval_task = eval(baseline_accuracy, train_task.outputs["accuracy"], train_task2.outputs["accuracy"], train_task3.outputs["accuracy"])
    
    with dsl.Condition(eval_task.output == "true", name="eval models"):
        model_upload_op = gcc_aip.ModelUploadOp(
            project=PROJECT_ID,
            display_name="model"+TIMESTAMP, 
    #        artifact_uri="gs://user-group-demo/pipeline_root/141610882258/train-scikitf989f632-b955-4bb1-a72d-0480d1c08627-20210620145355/train_-6780204423378370560/", # GCS location of model
            artifact_uri=train_task.outputs["artifact_uri"], # GCS location of model
            serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest",
        )

        endpoint_create_op = gcc_aip.EndpointCreateOp(
            project=PROJECT_ID,
            display_name="pipelines"+TIMESTAMP,
        )

        model_deploy_op = gcc_aip.ModelDeployOp( 
            project=PROJECT_ID,
            endpoint=endpoint_create_op.outputs["endpoint"],
            model=model_upload_op.outputs["model"],
            deployed_model_display_name="model_display_name",
            machine_type="n1-standard-4",
        )

In [196]:
from kfp.v2 import compiler

compiler.Compiler().compile(pipeline_func=pipeline, 
                            package_path="dag-"+TIMESTAMP+".json")

In [197]:
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

In [198]:
response = api_client.create_run_from_job_spec(
    "dag-"+TIMESTAMP+".json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={},
)