In [1]:
# IMPORT THE REQUIRED LIBRARIES

from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Output,
                        Model,
                        Metrics,
                        Markdown,
                        HTML,
                        component, 
                        OutputPath, 
                        InputPath)

from kfp import compiler
from google.cloud import aiplatform as vertex_
from google.cloud.aiplatform import pipeline_jobs

from datetime import datetime
import pandas as pd

In [2]:
PROJECT_ID = "id"
REGION = 'us-central1'

BUCKET_NAME = PROJECT_ID+"-rb"
source_data_blob = "data/data.csv"
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root_rb/"


In [3]:
print(PIPELINE_ROOT)

gs://id-rb/pipeline_root_rb/


In [4]:
# Custom base image created using docker

IMAGE_NAME = "training"
BASE_IMAGE = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/rb/{IMAGE_NAME}:latest"
print(BASE_IMAGE)

us-central1-docker.pkg.dev/id-410816/rb/training:latest


In [32]:
import os
if "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ:
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = input("Enter path to Google Cloud service account key JSON: ")


In [33]:
from google.cloud import storage


def create_bucket_class_location(bucket_name):
    """
    Create a new bucket in the US region with the coldline storage
    class
    """
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    bucket.storage_class = "COLDLINE"
    new_bucket = storage_client.create_bucket(bucket, location="us-central1")

    print(
        "Created bucket {} in {} with storage class {}".format(
            new_bucket.name, new_bucket.location, new_bucket.storage_class
        )
    )
    return new_bucket


In [34]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

In [35]:
import io

def write_read(bucket_name, blob_name):
    """Write and read a blob from GCS using file-like IO"""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your new GCS object
    # blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    
    content = blob.download_as_bytes()

    df = pd.read_csv(io.BytesIO(content))
    return df

    

In [36]:
df = write_read(BUCKET_NAME, "data/data.csv")

In [37]:
df.head()

Unnamed: 0,courier_id,order_number,courier_location_timestamp,courier_lat,courier_lon,order_created_timestamp,restaurant_lat,restaurant_lon
0,a98737cbhoho5012hoho4b5bhoho867fhoho8475c658546d,281289453,2021-04-02T04:30:42.328Z,50.48452,-104.618876,2021-04-02T04:20:42Z,50.483696,-104.61435
1,39a26fa0hohof428hoho47a4hohoa320hoho12e3d831c23a,280949566,2021-04-01T06:14:47.386Z,50.442573,-104.550463,2021-04-01T06:05:18Z,50.442422,-104.550487
2,3813235ehoho7a42hoho4601hohob7eahoho799e8af5b535,281328578,2021-04-02T05:48:57.224Z,50.49592,-104.635605,2021-04-02T05:13:26Z,50.496595,-104.635606
3,9f033953hohocd53hoho488ahohoaf51hohoc57943e499ed,281317998,2021-04-02T05:12:17.252Z,50.449445,-104.611521,2021-04-02T04:59:57Z,50.449504,-104.611074
4,56f65bc8hohoba54hoho47dfhohoa09chohof7464b5d9848,281314132,2021-04-02T05:15:38.266Z,50.495254,-104.666383,2021-04-02T04:54:53Z,50.49516,-104.665733


In [43]:
#!pip install fsspec
#!pip install gcsfs

In [42]:
df_train = pd.read_csv(f"gs://{BUCKET_NAME}/{source_data_blob}")

## Read the Dataset

In [56]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="get_data.yaml"
)

def get_std_data(
    gcs_bucket: str,
    gcs_path: str,
    dataset_train: Output[Dataset],
):
    # Set the service account key file for authentication
    
    import pandas as pd
    from google.cloud import storage
    # print(f"gs://{gcs_bucket}/{gcs_path}")
    df = pd.read_csv(f"gs://{gcs_bucket}/{gcs_path}")

    df.to_csv(dataset_train.path, index=False)

  @component(


## Data Preprocessing

In [57]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="preprocessing.yaml"
)

def preprocess_std_data(
    train_df: Input[Dataset],
    dataset_train_preprocessed: Output[Dataset],
):
    
    import pandas as pd
    from src.prepare_data.preprocessing import data_preprocessing_pipeline
   
    train_df = pd.read_csv(train_df.path)
    
    train_df_preprocessed = data_preprocessing_pipeline(train_df)
    
    train_df_preprocessed.to_csv(dataset_train_preprocessed.path, index=False)

  @component(


## Train Test Split

In [58]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="train_test_split.yaml",
)
def train_test_split(dataset_in: Input[Dataset],
                     dataset_train: Output[Dataset],
                     dataset_test: Output[Dataset],
                     test_size: float = 0.33):

    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(dataset_in.path)
    df_train, df_test = train_test_split(df, test_size=test_size, random_state=42)

    df_train.to_csv(dataset_train.path, index=False)
    df_test.to_csv(dataset_test.path, index=False)

  @component(


## Train the Model

In [59]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="model_training.yaml"
)
def train_busyness(
    dataset_train: Input[Dataset],
    dataset_test: Input[Dataset],
    model: Output[Model]
):
    
    import pandas as pd
    import pickle
    from src.train.model import BusynessEstimation
    from src.train.config import Data
    from src.utils.utils import get_image_data
    
    
    # Read train and test data
    train_data = pd.read_csv(dataset_train.path)
    test_data = pd.read_csv(dataset_test.path)
    
    # Instantiate the model class
    busyness_model = BusynessEstimation(
                                        test_data.copy()
                                        )
                                        
    # Create X_train and y_train
    X_train = train_data.drop(Data.target, axis=1)
    y_train = train_data[Data.target]

    # Fit the model (training pipeline consists of feature engineering, feature selection and training an xgboost model)
    busyness_model.fit(X_train, y_train)
    
    # Save the best hyperparameters as an artifact
    with open(best_params.path, "w") as f:
        f.write(str(busyness_model.best_params))
    """    
    shap.summary_plot(busyness_model.shap_values, busyness_model.X_test_transformed, max_display=20) # plot shap summary plot
    shap_plot_dataurl = get_image_data() # get image data to render the image in the html file
    html_content = f'<html><head></head><body><h1>Shap Summary Plot</h1>\n<img src={shap_plot_dataurl} width="97%"></body></html>' 
    # Save shap summary plot as an html artifact
    with open(shap_summary_plot.path, "w") as f: 
        f.write(html_content)
    """
    model.metadata["framework"] = "scikit-learn" 
    # Save the model as an artifact
    with open(model.path, 'wb') as f: 
        pickle.dump({
            "pipeline": busyness_model.model_pipeline,
            "target": busyness_model.target,
            "scores_dict": busyness_model.scores}, f)

  @component(


## Evaluate the Model

In [60]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="model_evaluation.yaml"
)
def evaluate_busyness(
    busyness_model: Input[Model],
    metrics_baseline: Output[Metrics],
    metrics_train: Output[Metrics],
    metrics_test: Output[Metrics]):
    
    import pickle
    
    file_name = busyness_model.path
    with open(file_name, 'rb') as file:  
        model_data = pickle.load(file)
    
    scores = model_data["scores_dict"] 

    def log_metrics(scores, metric):
        for metric_name, val in scores.items():
            metric.log_metric(metric_name, float(val))
            
    log_metrics(scores["baseline_scores"], metrics_baseline)        
    log_metrics(scores["train_scores"], metrics_train)
    log_metrics(scores["test_scores"], metrics_test)

  @component(


## Deploy the Model

In [61]:
@component(
    base_image=BASE_IMAGE,
    install_kfp_package=False,
    output_component_file="model_deployment.yaml",
)
def deploy_busyness(
        serving_container_image_uri: str,
        display_name: str,
        model_endpoint: str,
        gcp_project: str,
        gcp_region: str,
        model: Input[Model],
        vertex_model: Output[Model],
        vertex_endpoint: Output[Model]
):
    from google.cloud import aiplatform as vertex_ai
    from pathlib import Path
    
    # Checks existing Vertex AI Enpoint or creates Endpoint if it is not exist.
    def create_endpoint ():
        endpoints = vertex_ai.Endpoint.list(
        filter='display_name="{}"'.format(model_endpoint),
        order_by='create_time desc',
        project=gcp_project,
        location=gcp_region,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0] # most recently created
        else:
            endpoint = vertex_ai.Endpoint.create(
                display_name=model_endpoint,
                project=gcp_project,
                location=gcp_region
        )
        return endpoint

    endpoint = create_endpoint()
    
    # Uploads trained model to Vertex AI Model Registry or creates new model version into existing uploaded one.
    def upload_model ():
        listed_model = vertex_ai.Model.list(
        filter='display_name="{}"'.format(display_name),
        project=gcp_project,
        location=gcp_region,
        )
        if len(listed_model) > 0:
            model_version = listed_model[0] # most recently created
            model_upload = vertex_ai.Model.upload(
                    display_name=display_name,
                    parent_model=model_version.resource_name,
                    artifact_uri=str(Path(model.path).parent),
                    serving_container_image_uri=serving_container_image_uri,
                    location=gcp_region,
                    serving_container_predict_route="/predict",
                    serving_container_health_route="/health"
            )
        else:
            model_upload = vertex_ai.Model.upload(
                    display_name=display_name,
                    artifact_uri=str(Path(model.path).parent),
                    serving_container_image_uri=serving_container_image_uri,
                    location=gcp_region,
                    serving_container_predict_route="/predict",
                    serving_container_health_route="/health"
            )
        return model_upload
    
    uploaded_model = upload_model()
    
    # Save data to the output params
    vertex_model.uri = uploaded_model.resource_name

    # Deploys trained model to Vertex AI Endpoint
    model_deploy = uploaded_model.deploy(
        machine_type='n1-standard-1',
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=display_name,
    )

    # Save data to the output params
    vertex_endpoint.uri = model_deploy.resource_name

  @component(


## Create Pipeline

In [62]:
# USE TIMESTAMP TO DEFINE UNIQUE PIPELINE NAMES
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'pipelinebusyness-job{}'.format(TIMESTAMP)

In [63]:
serving_container_image_uri = "gcr.io/your-project/serving-image:latest"


@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-regionsbusyness"   
)

def pipeline(
    data_filepath: str = "data/data.csv",
    data_bucket : str = BUCKET_NAME,
    project: str = PROJECT_ID,
    region: str = REGION    
):

    data_op = get_std_data(gcs_bucket = data_bucket, gcs_path = data_filepath )
    data_preprocess_op = preprocess_std_data(train_df = data_op.outputs["dataset_train"])
    train_test_split_op = train_test_split(dataset_in = data_preprocess_op.outputs["dataset_train_preprocessed"])
    train_model_op = train_busyness(dataset_train = train_test_split_op.outputs["dataset_train"], dataset_test = train_test_split_op.outputs["dataset_test"])
    model_evaluation_op = evaluate_busyness(busyness_model = train_model_op.outputs["model"])
    
    deploy_model_op = deploy_busyness(
        model = train_model_op.outputs['model'],
        gcp_project = project,
        gcp_region = region, 
        serving_container_image_uri = serving_container_image_uri,
        display_name = "rb",
        model_endpoint = "rb_endpoint"
    )
    


## Compile and Run the pipelie

In [64]:
# COMPILE THE PIPELINE (to create the job spec file)

compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='ml_rb.json')

In [65]:
# CREATE A RUN USING THE JOB SPEC FILE GENERATED 

start_pipeline = pipeline_jobs.PipelineJob(
    display_name="rb-pipeline",
    template_path="ml_rb.json",
    enable_caching=False,
    location=REGION,
)

In [66]:
# RUN THE PIPELINE

start_pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/311923924433/locations/us-central1/pipelineJobs/pipeline-rb-20240117104617
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/311923924433/locations/us-central1/pipelineJobs/pipeline-rb-20240117104617')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-rb-20240117104617?project=311923924433
PipelineJob projects/311923924433/locations/us-central1/pipelineJobs/pipeline-rb-20240117104617 current state:
3
PipelineJob projects/311923924433/locations/us-central1/pipelineJobs/pipeline-rb-20240117104617 current state:
3
PipelineJob projects/311923924433/locations/us-central1/pipelineJobs/pipeline-rb-20240117104617 current state:
3


RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [get-std-data].; Job (project_id = id-410816, job_id = 4861649266763890688) is failed due to the above error.; Failed to handle the job: {project_number = 311923924433, job_id = 4861649266763890688}"


## Make Predictions Using Vertex AI Endpoint

In [None]:
from google.cloud import aiplatform as vertex_ai

endpoint_name = <ENDPOINT_URI>
endpoint = vertex_ai.Endpoint(endpoint_name)

In [None]:
test_df = pd.read_csv('./data/test.csv')

In [None]:
request = test_df.to_json(orient='records', lines=True)

In [None]:
predictions = endpoint.predict(instances=request.splitlines())

In [None]:
print(predictions)