In [1]:
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Output,
                        Model,
                        Metrics,
                        Markdown,
                        HTML,
                        component, 
                        OutputPath, 
                        InputPath)

from kfp.v2 import compiler
from google.cloud.aiplatform import pipeline_jobs, get_pipeline_df, init

from src.config.gcp_config import load_config

  from kfp.v2 import dsl


In [2]:
config = load_config().model_dump()

In [8]:
@component(
    base_image=config["base_image"],
    output_component_file="get_and_process_data.yaml"
)
def get_and_process_data(config: dict, output_data_path: OutputPath("Dataset"), model_training_date: Output[Artifact]):
    from google.cloud import bigquery
    from src.pipelines.utils import preprocess_data
    from datetime import datetime
    import tempfile
    import pickle
    from google.cloud import storage
    
    bq_client = bigquery.Client(location=config["region"], project=config["gcp_project_id"])
    query = f'SELECT * FROM `{config["main_table_id"]}`'
    df = bq_client.query(query).to_dataframe()
    
    model_training_date  = datetime.now().strftime("%d-%m-%Y:%H%M")

    X_processed, training_features = preprocess_data(df=df, target_column_name="pokemonId")
    X_processed.to_csv(output_data_path)

    with tempfile.NamedTemporaryFile() as tmp_pickle:
        pickle.dump(training_features, tmp_pickle)
        storage_client = storage.Client(project=config["gcp_project_id"])
        bucket = storage_client.bucket(config["gcp_bucket"])
        upload_blob = bucket.blob(f"models/{model_training_date}/preprocess_features")
        upload_blob.upload_from_file(tmp_pickle, rewind=True)

  @component(
  def get_and_process_data(config: dict, output_data_path: OutputPath("Dataset")):


In [28]:
@component(
    base_image=config["base_image"],
    install_kfp_package=False,
    output_component_file="train_model.yaml",
)
def train_model(config: dict,
                dataset: Input[Dataset],
                model_training_date: Input[Artifact],
                metrics: Output[Metrics]):
    import pandas as pd
    import pickle
    import tempfile
    from google.cloud import storage
    from datetime import datetime
    from sklearn.metrics import accuracy_score

    from src.pipelines.utils import model_train

    X_processed = pd.read_csv(dataset.path)
    
    model = model_train(X_processed=X_processed, target_column_name="pokemonId")

    # log accuracy
    result = accuracy_score(model.predict(X_processed.drop("pokemonId", axis=1)), X_processed["pokemonId"])
    metrics.log_metric("accuracy", (result * 100.0))

    with tempfile.NamedTemporaryFile() as tmp_pickle:
        pickle.dump(model, tmp_pickle)
        storage_client = storage.Client(project=config["gcp_project_id"])
        bucket = storage_client.bucket(config["gcp_bucket"])
        upload_blob = bucket.blob(f"models/{model_training_date}/model")
        upload_blob.upload_from_file(tmp_pickle, rewind=True)


  @component(
  def train_model(config: dict,


In [29]:
@dsl.pipeline(
    pipeline_root=config["pipeline_root"],
    name="train-model"   
)
def pipeline():
    data_prep = get_and_process_data(config=config).set_display_name('Load and preprocess')
    model_training = train_model(
        config=config, 
        dataset=data_prep.outputs["output_data_path"],
        model_training_date=data_prep.outputs["model_training_date"]
    ).after(data_prep).set_display_name('Model training')

In [30]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='model_training_pipeline.json')

In [31]:
start_pipeline = pipeline_jobs.PipelineJob(
    display_name="cc-project-pipeline",
    template_path="model_training_pipeline.json",
    enable_caching=False,
    location=config["region"],
)

In [32]:
start_pipeline.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/33891971032/locations/europe-west3/pipelineJobs/train-model-20240408185455
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/33891971032/locations/europe-west3/pipelineJobs/train-model-20240408185455')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west3/pipelines/runs/train-model-20240408185455?project=33891971032
PipelineJob projects/33891971032/locations/europe-west3/pipelineJobs/train-model-20240408185455 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/33891971032/locations/europe-west3/pipelineJobs/train-model-20240408185455 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/33891971032/locations/europe-west3/pipelineJobs/train-model-20240408185455 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/33891971032/locations/europe-west3/pipelineJobs/train-model-20240408185455

In [34]:
init(project=config["gcp_project_id"], location=config["region"])
pipeline_df = get_pipeline_df(pipeline="train-model")
pipeline_df

Unnamed: 0,pipeline_name,run_name,param.vmlmd_lineage_integration,param.vertex-ai-pipelines-artifact-argument-binding,metric.accuracy
0,train-model,train-model-20240408185455,{'pipeline_run_component': {'parent_task_names...,{'output:train-model-metrics': ['projects/3389...,0.923982
1,train-model,train-model-20240408185104,{'pipeline_run_component': {'parent_task_names...,,
2,train-model,train-model-20240408184607,{'pipeline_run_component': {'parent_task_names...,,
3,train-model,train-model-20240408183946,{'pipeline_run_component': {'location_id': 'eu...,,
4,train-model,train-model-20240408182413,{'pipeline_run_component': {'parent_task_names...,,
5,train-model,train-model-20240407163619,{'pipeline_run_component': {'location_id': 'eu...,{'output:train-model-metrics': ['projects/3389...,70.462742
6,train-model,train-model-20240407163150,{'pipeline_run_component': {'parent_task_names...,,
