In [1]:
! mkdir -p _artifacts

In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

GCP_PROJECT_ID = os.environ["GCP_PROJECT_ID"]

In [2]:
from typing import Optional, NamedTuple

import kfp
from kfp import components
from kfp.v2 import compiler
from kfp.v2.dsl import (
    component,
    Input,
    InputPath,
    OutputPath,
    Output,
    Dataset,
    Metrics,
)




@component(
    base_image="python:3.9-slim",
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow"],
    output_component_file="_artifacts/query.yaml",
)
def query(
    query: str, output_path: OutputPath("Dataset"), project_id: Optional[str] = None
) -> None:
    """Calculates sum of two arguments"""

    from google.cloud import bigquery

    client = bigquery.Client(project=project_id)
    job = client.query(query)

    df = job.to_dataframe()
    df.to_parquet(output_path)


train_model = components.load_component_from_text(
    f"""
name: Train model
description: Trains our model

inputs:
- {{name: train_dataset, type: Dataset, description: 'Train dataset'}}

outputs:
- {{name: model, type: Model, description: 'Output model'}}

implementation:
  container:
    image: europe-west1-docker.pkg.dev/{GCP_PROJECT_ID}/vertex/penguin_model
    command: [
      penguin-model, 
      {{inputPath: train_dataset}},
      {{outputPath: model}}
    ]
"""
)


@component(
    base_image="python:3.9-slim", output_component_file="_artifacts/eval_model.yaml"
)
def eval_model(
    model_path: InputPath("Model"), metrics: Output[Metrics]
) -> NamedTuple("EvalModelOutput", [("roc", float)]):
    print(model_path)

    metrics.log_metric("roc", 0.9)


@kfp.dsl.pipeline(name="penguin")
def pipeline():

    query_task = query(
        "SELECT * FROM bigquery-public-data.ml_datasets.penguins",
        project_id=GCP_PROJECT_ID,
    )

    train_task = (
        train_model(query_task.outputs["output_path"])
        # Docs: https://www.kubeflow.org/docs/distributions/gke/pipelines/enable-gpu-and-tpu/
        # .set_gpu_limit(1).add_node_selector_constraint(
        #     "cloud.google.com/gke-accelerator", "nvidia-tesla-k80"
        # )
    )

    eval_model(train_task.outputs["model"])

    # model_upload_op = gcc_aip.ModelUploadOp(
    #     project=project,
    #     display_name=model_display_name,
    #     artifact_uri=WORKING_DIR,
    #     serving_container_image_uri=serving_container_image_uri,
    #     # serving_container_environment_variables={"NOT_USED": "NO_VALUE"},
    # )
    # model_upload_op.after(train_task)


compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="_artifacts/pipeline.json",
)



In [4]:
from google.cloud.aiplatform.pipeline_jobs import PipelineJob

job = PipelineJob(
    display_name="penguins",
    enable_caching=False,
    template_path="_artifacts/pipeline.json",
    parameter_values={},
    pipeline_root=f"gs://{GCP_PROJECT_ID}-penguin-artifacts/pipelines",
    location="europe-west1",
)

job.run(
    service_account=f"vmd-penguin@{GCP_PROJECT_ID}.iam.gserviceaccount.com"
)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/1035892568606/locations/europe-west1/pipelineJobs/penguin-20211119123746
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/1035892568606/locations/europe-west1/pipelineJobs/penguin-20211119123746')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/penguin-20211119123746?project=1035892568606
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/1035892568606/locations/europe-west1/pipelineJobs/penguin-20211119123746 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/1035892568606/locations/europe-west1/pipelineJobs/penguin-20211119123746 curr