# How to use Google Cloud Vertex AI to build a ML pipeline using Kubeflow?

Hello everyone! Welcome to the Step by Step tutorial to run your first model using Kubeflow pipelines. To know more details about each step we recommend you to read our [medium article]() and for a simpler version you can also check the other article that we developed in [here](https://medium.com/@outsidenoxvodafone/running-your-first-ml-model-using-gcp-on-vertex-ai-1535b6732c6c).

### **Step 0: Download the required packages**

In [2]:
!pip install kfp==1.8.14
!pip install google-cloud-aiplatform==1.18.1

### **Step 1: Import the necessary libraries**

In [44]:
import json
from typing import NamedTuple
from datetime import datetime
import google.cloud.aiplatform as aiplatform
from kfp.v2.dsl import (compiler, component, Input, Model, Output, Dataset, Artifact, 
                        OutputPath, ClassificationMetrics, Metrics, InputPath)

### **Step 2: Define variables**

* Define the pipeline varibles to setup the environment


In [5]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

PROJECT_ID = #you have to fill in your project id 
LOCATION = #you have to fill in the location of the data
PIPELINE_ROOT = #the location where the pipeline's artifacts are stored
SERVICE_ACCOUNT = #the service account to connect with your project
PIPELINE_NAME = "medium-article"
JOBID = f"training-pipeline-{TIMESTAMP}"
ENABLE_CACHING = False
TEMPLATE_PATH = "medium_pipeline.json"

In [4]:
DATASET_ID = "medium_article"
TABLE_ID = "temp_table_medium"
COL_LABEL = "class" 
COL_TRAINING=["sepal_length", "sepal_width", "petal_length", "petal_width"]

PIPELINE_PARAMS = {"project_id": PROJECT_ID,
                   "dataset_location": LOCATION,
                   "table_id": TABLE_ID,
                   "dataset_id": DATASET_ID,
                   "col_label": COL_LABEL,
                   "col_training": COL_TRAINING}

* Define the docker image versions, to get other images go to: https://cloud.google.com/deep-learning-containers/docs/choosing-container

In [29]:
BASE_IMAGE = "gcr.io/deeplearning-platform-release/xgboost-cpu"

* Define your packages versions

In [47]:
GCP_BIGQUERY = "google-cloud-bigquery==2.30.0"
PANDAS = "pandas==1.3.2"
SKLEARN = "scikit-learn==1.0.2"
NUMPY = "numpy==1.21.6"

### **Step 3: Building the components**

The next step is to build each component in separate, where each one have a specific utility and meaning.

#### 1. INGEST DATA FROM BIG QUERY

In [31]:
@component(base_image=BASE_IMAGE, packages_to_install=[GCP_BIGQUERY])
def query_to_table(
    query: str,
    project_id: str,
    dataset_id: str,
    table_id: str,
    location: str = "EU",
    query_job_config: dict = None,
) -> None:
    """
    Run the query and create a new BigQuery table
    """
    
    import google.cloud.bigquery as bq
    
    # Configure your query job
    job_config = bq.QueryJobConfig(destination=f"{project_id}.{dataset_id}.{table_id}", 
                                   **query_job_config)
    
    # Initiate the Big Query client to connect with the project
    bq_client = bq.Client(project=project_id, 
                          location=location)
    
    # Generate the query with all the job configurations
    query_job = bq_client.query(query, 
                                job_config=job_config)

#### 2. INGEST DATA FROM BIG QUERY to GOOGLE CLOUD STORAGE

In [None]:
@component(base_image=BASE_IMAGE, packages_to_install=[GCP_BIGQUERY])
def extract_table_to_gcs(
    project_id: str,
    dataset_id: str,
    table_id: str,
    dataset: Output[Dataset],
    location: str = "EU",
) -> None:
    """
    Extract a Big Query table into Google Cloud Storage.
    """

    import logging
    import os
    import google.cloud.bigquery as bq

    # Get the table generated on the previous component
    full_table_id = f"{project_id}.{dataset_id}.{table_id}"
    table = bq.table.Table(table_ref=full_table_id)

    # Initiate the Big Query client to connect with the project
    job_config = bq.job.ExtractJobConfig(**{})
    client = bq.client.Client(project=project_id, location=location)

    # Submit the extract table job to store on GCS
    extract_job = client.extract_table(table, dataset.uri)

#### 3. CREATE THE TRAINING AND TEST SET

In [49]:
@component(
    base_image=BASE_IMAGE, packages_to_install=[PANDAS, SKLEARN]
)
def create_sets(
    data_input: Input[Dataset],
    dataset_train: OutputPath(),
    dataset_test: OutputPath(),
    col_label: str,
    col_training: list
    ) -> NamedTuple("Outputs", [("dict_keys", dict), ("shape_train", int), ("shape_test", int)]):
    
    
    """
    Split data into train and test set.
    """

    import logging
    import pickle

    import pandas as pd
    from sklearn import model_selection
    

    def convert_labels_to_categories(labels):
        """
        Function returns a dictionary with the encoding of labels.
        :returns: A Pandas DataFrame with all the metrics
        """
        try:
            dic_keys = {k: label for k, label in enumerate(sorted(labels.unique()))}
            dic_vals = {label: k for k, label in enumerate(sorted(labels.unique()))}
            return dic_vals, dic_keys
        except Exception as e:
            print(f'[ERROR] Something went wrong that is {e}')
        return {}, {}


    df_ = pd.read_csv(data_input.path)
    
    df_.dropna(inplace=True)

    logging.info(f"[START] CREATE SETS, starts with an initial shape of {df_.shape}")

    if len(df_) != 0:

        yy = df_[col_label]
        dic_vals, dic_keys = convert_labels_to_categories(yy)

        yy = yy.apply(lambda v: dic_vals[v])
        xx = df_[col_training]

        x_train, x_test, y_train, y_test = model_selection.train_test_split(xx, yy, test_size=0.2, random_state=0, stratify=yy)

        x_train_results = {'x_train': x_train, 'y_train': y_train}
        x_test_results = {'x_test': x_test, 'y_test': y_test}

        with open(dataset_train + f".pkl", 'wb') as file:
            pickle.dump(x_train_results, file)

        with open(dataset_test + ".pkl", 'wb') as file:
            pickle.dump(x_test_results, file)

        logging.info(f"[END] CREATE SETS, dataset was split")

        return (dic_keys, len(x_train), len(x_test))

    else:
        logging.error(f"[END] CREATE SETS, dataset is empty")
        return (None, None, None)

#### 4. LET'S TRAIN MODEL!

In [None]:
@component(
    base_image=BASE_IMAGE, packages_to_install=[SKLEARN, PANDAS]
)
def train_model(
    training_data: InputPath(),
    model: Output[Model],
) -> None:

    """
    Train a classification model.
    """
        
    import logging
    import os
    import pickle

    import joblib
    import numpy as np
    from sklearn.linear_model import LogisticRegression

    logging.getLogger().setLevel(logging.INFO)

    with open(training_data + ".pkl", 'rb') as file:
        train_data = pickle.load(file)

    X_train = train_data['x_train']
    y_train = train_data['y_train']
    
    logging.info(f"X_train shape {X_train.shape}")
    logging.info(f"y_train shape {y_train.shape}")
    
    logging.info("Starting Training...")

    clf = LogisticRegression(n_jobs=-1, random_state=42)
    train_model = clf.fit(X_train, y_train)

    # ensure to change GCS to local mount path
    os.makedirs(model.path, exist_ok=True)

    logging.info(f"Save model to: {model.path}")
    joblib.dump(train_model, model.path + "/model.joblib")

#### 5. CREATE SOME PREDICTIONS

In [None]:
@component(
    base_image=BASE_IMAGE, packages_to_install=[PANDAS]
)
def predict_model(
    test_data: InputPath(),
    model: Input[Model],
    predictions: Output[Dataset],
) -> None:
    
    
    """
    Create the predictions of the model.
    """    

    import logging
    import os
    import pickle

    import joblib
    import pandas as pd

    logging.getLogger().setLevel(logging.INFO)


    with open(test_data + ".pkl", 'rb') as file:
        test_data = pickle.load(file)

    X_test = test_data['x_test']
    y_test = test_data['y_test']

    # load model
    model_path = os.path.join(model.path, "model.joblib")
    model = joblib.load(model_path)

    y_pred = model.predict(X_test)

    # predict and save to prediction column
    df = pd.DataFrame({
        'input': X_test.tolist(),
        'class_true': y_test.tolist(),
        'class_pred': y_pred.tolist()}
    )

    # save dataframe (feature, labels if provided, predictions)
    df.to_csv(predictions.path, sep=",", header=True, index=False)

#### 6. CREATE EVALUATION METRICS COMPONENT

In [None]:
@component(
    base_image=BASE_IMAGE, packages_to_install=[PANDAS, NUMPY]
)

def evaluation_metrics(
    predictions: Input[Dataset],
    metrics_names: list,
    dict_keys: dict,
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics],
    eval_metrics: Output[Metrics]
) -> None:
    
    """
    Create the evaluation metrics.
    """ 

    import json
    import logging
    from importlib import import_module

    import numpy as np
    import pandas as pd

    results = pd.read_csv(predictions.path)
    
    results['class_true_clean'] = results[true_column_name].astype(str).map(dict_keys)
    results['class_pred_clean'] = results[pred_column_name].astype(str).map(dict_keys)

    module = import_module(f"sklearn.metrics")
    metrics_dict = {}

    for each_metric in metrics_names:
        metric_func = getattr(module, each_metric)

        if each_metric == 'f1_score':
            metric_val = metric_func(results['class_true'], results['class_pred'], average=None)
        else:
            metric_val = metric_func(results['class_true'], results['class_pred'])

        # Save metric name and value
        metric_val = np.round(np.average(metric_val), 4)
        metrics_dict[f"{each_metric}"] = metric_val
        kpi.log_metric(f"{each_metric}", metric_val)
        
        # dumping kpi metadata
        with open(kpi.path, "w") as f:
            json.dump(kpi.metadata, f)
        logging.info(f"{each_metric}: {metric_val:.3f}")

    # dumping metrics_dict
    with open(eval_metrics.path, "w") as f:
        json.dump(metrics_dict, f)

    confusion_matrix_func = getattr(module, "confusion_matrix")
    metrics.log_confusion_matrix(list(dict_keys.values()),
        confusion_matrix_func(results['class_true_clean'], results['class_pred_clean']).tolist(),)

    # dumping metrics metadata
    with open(metrics.path, "w") as f:
        json.dump(metrics.metadata, f)


### **Step 4: Build the kubeflow pipeline with all the components**

In [33]:
@dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT)
def medium_pipeline(
    project_id: str,
    dataset_location: str,
    dataset_id: str,
    table_id: str,
    col_label: str,
    col_training: list):
    
    QUERY = """SELECT * FROM `project-id.medium_article.iris_dataset`"""
    METRICS_NAMES = ["accuracy_score", "f1_score"]
    
    ingest = query_to_table(query=QUERY,
                            table_id=table_id,
                            project_id=project_id,
                            dataset_id=dataset_id,
                            location=dataset_location,
                            query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE"))
                           ).set_display_name("Ingest Data")
    
    # From big query store in GCS
    ingested_dataset = (
                        extract_table_to_gcs(
                            project_id=project_id,
                            dataset_id=dataset_id,
                            table_id=table_id,
                            location=dataset_location,
                        )
                        .after(ingest)
                        .set_display_name("Extract Big Query to GCS")
                    )
    
    # Split data
    spit_data = create_sets(data_input=ingested_dataset.outputs["dataset"],
                              col_label=col_label,
                              col_training=col_training
                           ).set_display_name("Split data")
    
    # Train model
    training_model = train_model(
        training_data=spit_data.outputs['dataset_train']).set_display_name("Train Model")
    
    # Predit model
    predict_data = predict_model(
                test_data=spit_data.outputs['dataset_test'],
                model=training_model.outputs["model"]
            ).set_display_name("Create Predictions")
    
    
    # Evaluate model
    eval_metrics = evaluation_metrics(
        predictions=predict_data.outputs['predictions'],
        dict_keys=spit_data.outputs['dict_keys'],
        metrics_names=json.dumps(METRICS_NAMES),
        ).set_display_name("Evaluation Metrics")
    

### **Step 5: Compile the Pipeline**

In [10]:
compiler.Compiler().compile(
    pipeline_func=medium_pipeline,
    package_path=TEMPLATE_PATH)



### **Step 6: Let's submit this pipeline!!**

In [11]:
aiplatform.init(project=PROJECT_ID, location=LOCATION)

In [12]:
pipeline_ = aiplatform.pipeline_jobs.PipelineJob(
    enable_caching=ENABLE_CACHING,
    display_name=PIPELINE_NAME,
    template_path=TEMPLATE_PATH,
    job_id=JOBID,
    parameter_values=PIPELINE_PARAMS)

pipeline_.submit(service_account=SERVICE_ACCOUNT)

Creating PipelineJob
PipelineJob created. Resource name: projects/737341566193/locations/europe-west1/pipelineJobs/training-pipeline-20221117093807
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/737341566193/locations/europe-west1/pipelineJobs/training-pipeline-20221117093807')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/training-pipeline-20221117093807?project=737341566193
