In [1]:
USER_FLAG = "--user"

In [2]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.3.0 --upgrade
!pip3 install {USER_FLAG} kfp --upgrade



In [3]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.2


In [4]:
!pip list | grep aiplatform

google-cloud-aiplatform        1.3.0


In [5]:
from typing import NamedTuple
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component
from kfp.v2.google.client import AIPlatformClient
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        component,
                        OutputPath,
                        ClassificationMetrics)

from google.cloud.aiplatform import pipeline_jobs

import pandas as pd

In [82]:
# This shell-command outputs default project
PROJECT_ID ="XXXXXXXXXXXX"
REGION = "XXXXXXXXXXXX"
BUCKET_NAME = "gs://XXXXXXXXXXXX"

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
USER = "Thanga"
PIPELINE_ROOT = "{}/pipeline_root".format(BUCKET_NAME)
# If there are multiple users, it is better to use username in path:
# PIPELINE_ROOT = "{}/pipeline_root/{}".format(BUCKET_NAME, USER)

PIPELINE_ROOT

env: PATH=/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin:/home/jupyter/.local/bin


'gs://micronbucket/pipeline_root'

ML Pipeline

In [114]:
@component(
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow","sklearn"],
    base_image="python:3.9",
    output_component_file="data_ingestion.yaml"
)
def data_ingestion(
    bq_table: str,
    dataset_raw: Output[Dataset]
):
    #Query data warehouse and return dataframe
    from google.cloud import bigquery
    import pandas as pd

    bqclient = bigquery.Client()
    table = bigquery.TableReference.from_string(bq_table)
    rows = bqclient.list_rows(table)
    
    dataframe = rows.to_dataframe(create_bqstorage_client=True,)
    
    dataframe = dataframe.sample(frac=1, random_state=2)
    
    dataframe.to_csv(dataset_raw.path,index = False)
    
    import logging
    logging.info("deployment decision is %s", dataframe.shape)

In [115]:
@component(
    packages_to_install=["sklearn", "pandas", "imblearn"],
    base_image="python:3.9",
    output_component_file="data_preprocessing.yaml",
)
def data_preprocessing(
    dataset_raw: Input[Dataset],
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset]
):
    from sklearn.model_selection import train_test_split
    from imblearn.over_sampling import SMOTE
    from collections import Counter

    import pandas as pd
    df = pd.read_csv(dataset_raw.path)
    
    #Null value handling
    df = df.fillna(df.mean())
    
    #Normalization
    X = df.drop('target' , axis =1)
    y = df['target']
    
    from sklearn.preprocessing import MinMaxScaler
    minMaxScaler = MinMaxScaler()
    X = pd.DataFrame(minMaxScaler.fit_transform(X), columns = X.columns)
    
    # Handling imbalanced dataset
    
    #smote = SMOTE()
    #x_smote, y_smote = smote.fit_resample(X, y)
    #balanced_dataframe = pd.concat([pd.DataFrame(x_smote),pd.DataFrame(y_smote)], axis=1)
    #df = balanced_dataframe
    df = pd.concat([pd.DataFrame(X),pd.DataFrame(y)], axis=1)
    
    train, test = train_test_split(df, test_size=0.3)
    
    train.to_csv(dataset_train.path,index = False)
    test.to_csv(dataset_test.path,index = False)
    
    import logging
    logging.info("deployment decision is %s", df.shape)

In [116]:
@component(
    packages_to_install=["sklearn", "pandas", "joblib","imblearn"],
    base_image="python:3.9",
    output_component_file="model_training.yaml",
)
def model_training(
    dataset_train: Input[Dataset],
    model: Output[Model]):
    from sklearn.ensemble import RandomForestClassifier
    from joblib import dump
    import pandas as pd
    
    df = pd.read_csv(dataset_train.path)
    
    X_train = df.drop('target' , axis =1)
    y_train = df['target']
    
    skmodel = RandomForestClassifier( min_samples_leaf = 1, max_features= 'auto', criterion= 'entropy', bootstrap= False)
    skmodel.fit(X_train, y_train)
    
    # Calculate score.First argument is X, second is Y. 
    score = skmodel.score(X_train,y_train)
    y_pred = skmodel.predict(X_train)
    
    from sklearn.metrics import f1_score
    from sklearn.metrics import precision_score
    from sklearn.metrics import recall_score
    
    f1_score_value = f1_score(y_train, y_pred, average='weighted')
    precision_score_value = precision_score(y_train, y_pred, average='weighted')
    recall_value = recall_score(y_train, y_pred, average='weighted')


     # We can reach model's metadata.
    model.metadata["Training_score"] = (score * 100.0) 
    model.metadata["Training_DataSize"] = len(df)
    
    model.metadata["Training_f1_score"] = (f1_score_value * 100.0)
    model.metadata["Training_precision_score"] = (precision_score_value * 100.0)
    model.metadata["Training_recall_score"] = (recall_value * 100.0)
    
    model.metadata["Classificaton Algorithm"] = "RandomForestClassifier" # We define this metadata.
    
    dump(skmodel, model.path + ".joblib")
    print (model.path)

In [117]:
@component(packages_to_install = ["pandas","sklearn","numpy","joblib"],
          base_image="python:3.9",
          output_component_file="evaluation_model.yaml",)
def evaluation_model(
    dataset_test: Input[Dataset],
    model: Input[Model],
    metrics: Output[Metrics]
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.
    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import numpy as np
    import json
    import logging
    
    df = pd.read_csv(dataset_test.path)

    X_test = df.drop('target' , axis =1)
    y_test = df['target']
    
    import joblib
    skmodel = joblib.load(model.path  + ".joblib")

    score = skmodel.score(X_test,y_test)
    y_pred = skmodel.predict(X_test)
    
    from sklearn.metrics import f1_score
    from sklearn.metrics import precision_score
    from sklearn.metrics import recall_score
    
    f1_score_value = f1_score(y_test, y_pred, average='weighted')
    precision_score_value = precision_score(y_test, y_pred, average='weighted')
    recall_value = recall_score(y_test, y_pred, average='weighted')

    
    
    metrics.log_metric("f1_score", (f1_score_value * 100.0))
    metrics.log_metric("precision" , (precision_score_value * 100.0))
    metrics.log_metric("recall",(recall_value * 100.0))
    metrics.log_metric("accuracy",(score * 100.0))
    
    model.metadata["Testing_score"] = (score * 100.0)
    model.metadata["Testing_DataSize"] = len(df)
    
    if ((f1_score_value * 100.0) >= 97.0):
        dep_decision = "true"
    else:
        dep_decision = "false"
    
    logging.info("deployment decision is f1_score %f", (f1_score_value * 100.0))
    logging.info("deployment decision is %s", dep_decision)
    
    return (dep_decision, )

In [118]:
@component(packages_to_install = ["google-cloud-bigquery", "pandas","sklearn","numpy", "joblib", "pyarrow"],
          base_image="python:3.9",
          output_component_file="evaluation_model.yaml",)
def model_inference(
    dataset_raw: Input[Dataset],
    model: Input[Model]):
    
    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import numpy as np
    import json
    import logging
    
    df = pd.read_csv(dataset_raw.path)
    
    #Null value handling
    df = df.fillna(df.mean())
    
    #Normalization
    X = df.drop('target' , axis =1)
    y = df['target']
    from sklearn.preprocessing import MinMaxScaler
    minMaxScaler = MinMaxScaler()
    X = pd.DataFrame(minMaxScaler.fit_transform(X), columns = X.columns)
    
    X_test = X
    y_test = y
    
    import joblib
    skmodel = joblib.load(model.path  + ".joblib")

    y_pred = skmodel.predict(X_test)
    
    # Place the DataFrames side by side
    dataframe = pd.concat([X_test, pd.DataFrame(y_pred,columns=['target'])], axis=1)
    
    #Query data warehouse and return dataframe
    from google.cloud import bigquery
    import pandas as pd

    bqclient = bigquery.Client()
    table = bigquery.TableReference.from_string("microninterview.microndataset.inferenceresults")
    rows = bqclient.list_rows(table)

    # Since string columns use the "object" dtype, pass in a (partial) schema
    # to ensure the correct BigQuery data type.
    job_config = bigquery.LoadJobConfig()
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
    
    table_id = "microninterview.microndataset.inferenceresults"
    #job = bqclient.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    
    #job.result()
    
    

In [119]:
@component(
    packages_to_install=["google-cloud-aiplatform", "joblib", "sklearn"],
    base_image="python:3.9",
    output_component_file="deploy_model.yaml",
)
def deploy_model(
    model: Input[Model],
    project: str,
    region: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)

    deployed_model = aiplatform.Model.upload(
        display_name="model-pipeline",
        artifact_uri = model.uri[:-5],
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest"
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")

    # Save data to the output params
    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name

In [123]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="mlmd-pipeline",
)
def pipeline(
    bq_table: str = "microninterview.microndataset.micronmodeldata",
    output_data_path: str = "data.csv",
    project: str = PROJECT_ID,
    region: str = REGION
):
    data_ingestion_task = data_ingestion(bq_table)
    
    data_preprocessing_task = data_preprocessing(data_ingestion_task.outputs["dataset_raw"])

    model_training_task = model_training(data_preprocessing_task.outputs["dataset_train"])
    
    #model_inference_task = model_inference (data_ingestion_task.outputs["dataset_raw"], 
    #                                                model=model_training_task.outputs["model"])

    evaluation_model_task = evaluation_model(dataset_test=data_preprocessing_task.outputs["dataset_test"],
                                             model = model_training_task.outputs["model"])
    
    with dsl.Condition(
          evaluation_model_task.outputs["dep_decision"] == "true",
          name="deploy_decision",
      ):
            deploy_task = deploy_model(model=model_training_task.outputs["model"],project=project,region=region)
            model_inference_task = model_inference (data_ingestion_task.outputs["dataset_raw"], 
                                                    model=model_training_task.outputs["model"])


In [124]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="mlmd_pipeline.json")

In [125]:
api_client = AIPlatformClient(project_id=PROJECT_ID,region=REGION)
response = api_client.create_run_from_job_spec('mlmd_pipeline.json',enable_caching= False)