In [36]:
from typing import NamedTuple
from kfp.v2 import dsl
from kfp.v2.dsl import (Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        component, 
                        OutputPath, 
                        InputPath)

from kfp.v2 import compiler
from google.cloud.aiplatform import pipeline_jobs

In [37]:
@component(
packages_to_install=["gcsfs","pandas","google-cloud-storage"]
)
def validate_input_data(filename:str)->NamedTuple("output", [("validation", str)]):
    import logging
    from google.cloud import storage
    import pandas as pd

    logging.basicConfig(level=logging.INFO)

    logging.info(f"Reading file: {filename}")
    df = pd.read_csv(filename)
    
    validation="true"
    
    expected_num_cols = 17
    num_cols = len(df.columns)

    logging.info(f"Number of columns: {num_cols}")

    if num_cols != expected_num_cols:
        validation="false"
        
    expected_col_names = ['instant', 'dteday', 'season', 'yr', 'mnth', 'hr', 'holiday', 
                          'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 
                          'hum', 'windspeed', 'casual', 'registered', 'cnt']

    if set(df.columns) != set(expected_col_names):
        validation="false"

    return (validation,)
    

In [38]:
@component(
packages_to_install=["google-cloud-aiplatform","gcsfs","scikit-learn","pandas","google-cloud-storage","gsutil"]
)
def custom_training_job_component():
    
    from google.cloud import aiplatform
    from google.cloud import storage
    import logging

    logging.basicConfig(level=logging.INFO)
    
    aiplatform.init(project="udemy-mlops", location="us-central1", staging_bucket="gs://sid-kubeflow-v1")
    
    bucket = "sid-kubeflow-v1"
    source_blob_name = "bikeshare-model/model-training-code.py"
    
    logging.info(f"Downloading blob {source_blob_name} from bucket {bucket}")
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename("model-training-code.py")
    
    logging.info("Blob downloaded successfully")
    
    job = aiplatform.CustomTrainingJob(
        display_name="bikeshare-training-job",
        script_path="model-training-code.py",
        container_uri="us-docker.pkg.dev/vertex-ai/training/scikit-learn-cpu.0-23:latest",
        requirements=["gcsfs"]
    )
    logging.info("Starting training job")
    job.run(
        replica_count=1,
        machine_type="n1-standard-4",
        sync=True
    )
    logging.info("Training job started successfully")
    
    job.wait()
    logging.info("Training job completed")

In [39]:
@dsl.pipeline(
    pipeline_root="gs://sid-kubeflow-v1/bikeshare-model/bikeshare-pipeline-root",
    name="bikeshare-pipeline-v1",   
)
def pipeline(
    project: str = "udemy-mlops",
    region: str = "us-central1", 
    display_name: str = "bikeshare-pipeline-v1"
    ):
    
    filename = "gs://sid-kubeflow-v1/bikeshare-model/hour.csv"
    validate_input_ds = validate_input_data(filename)
    
    with dsl.Condition(validate_input_ds.outputs["validation"]=="true", name="Check if input ds is valid"):        
        trigger_model_training = custom_training_job_component().after(validate_input_ds)

compiler.Compiler().compile(pipeline_func=pipeline,package_path='bikeshare-pipeline-v1.json')

start_pipeline = pipeline_jobs.PipelineJob(
    display_name="bikeshare-pipeline-v1",
    template_path="bikeshare-pipeline-v1.json",
    enable_caching=False,
    location="us-central1",
)

start_pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/1090925531874/locations/us-central1/pipelineJobs/bikeshare-pipeline-v1-20230718165123
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/1090925531874/locations/us-central1/pipelineJobs/bikeshare-pipeline-v1-20230718165123')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/bikeshare-pipeline-v1-20230718165123?project=1090925531874
PipelineJob projects/1090925531874/locations/us-central1/pipelineJobs/bikeshare-pipeline-v1-20230718165123 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1090925531874/locations/us-central1/pipelineJobs/bikeshare-pipeline-v1-20230718165123 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1090925531874/locations/us-central1/pipelineJobs/bikeshare-pipeline-v1-20230718165123 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/10909255