In [1]:
import kfp
from kfp.v2.dsl import component
import google.cloud.aiplatform as aip

In [2]:
BUCKET_NAME="gs://garrido-ml-models"
REGION="us-central1"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PROJECT_ID="ml-demos-garrido"

In [3]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [4]:
@kfp.dsl.pipeline(name="r-model-iris-pipeline")
def pipeline(
    project: str = PROJECT_ID,
    model_display_name: str = "rmodel",
    region: str = REGION
):
    from google_cloud_pipeline_components.types import artifact_types 
    from google_cloud_pipeline_components.v1.custom_job import \
        CustomTrainingJobOp
    from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,
                                                              ModelDeployOp)
    from google_cloud_pipeline_components.aiplatform import ModelUploadOp
    from kfp.v2.components import importer_node

    custom_job_task = CustomTrainingJobOp(
        project=project,
        display_name=f"{model_display_name}-training",
        worker_pool_specs=[
            {
                "containerSpec": {
                    "imageUri": "gcr.io/ml-demos-garrido/rmodel:latest",
                },
                "replicaCount": "1",
                "machineSpec": {
                    "machineType": "n1-standard-16"
                },
            }
        ],
    )

    model_upload_op = ModelUploadOp(
        project=project,
        location=region,
        display_name=model_display_name,
        serving_container_image_uri="gcr.io/ml-demos-garrido/rserving:latest",
        serving_container_ports=[{"containerPort": 7080}],
        serving_container_predict_route="/clasiffy",
        serving_container_health_route="/ping",
    )
    model_upload_op.after(custom_job_task)

    endpoint_create_op = EndpointCreateOp(
        project=project,
        display_name="rmodel-endpoint",
    )

    ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name=model_display_name,
        traffic_split={"0":100}
        service_account="mlops-services@ml-demos-garrido.iam.gserviceaccount.com",
        dedicated_resources_machine_type="n1-standard-16",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )

In [5]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="r_model_pipeline.json"
)



In [6]:
DISPLAY_NAME = "r_model" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="r_model_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
)

job.run()