In [10]:
!pip install --user --upgrade kfp google-cloud-aiplatform google-cloud-pipeline-components

Collecting google-cloud-pipeline-components
  Obtaining dependency information for google-cloud-pipeline-components from https://files.pythonhosted.org/packages/df/e8/04e989b5e22d7cfc1208d15852e3ef57c881b439a4cfe241372d1fcdde77/google_cloud_pipeline_components-2.13.1-py3-none-any.whl.metadata
  Downloading google_cloud_pipeline_components-2.13.1-py3-none-any.whl.metadata (5.9 kB)
Downloading google_cloud_pipeline_components-2.13.1-py3-none-any.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m43.9 MB/s[0m eta [36m0:00:00[0m
[0mInstalling collected packages: google-cloud-pipeline-components
Successfully installed google-cloud-pipeline-components-2.13.1


In [1]:
import kfp
from kfp import compiler
from google.cloud import aiplatform
import os
from datetime import datetime

In [2]:
# The project and bucket are for experiments below.
PROJECT_ID = "donuts-dev"  # @param {type:"string"}

# The form for BUCKET_URI is gs://<bucket-name>.
BUCKET_URI = "gs://donuts-dev-usc1"  # @param {type:"string"}
REGION = "us-central1"  # @param {type: "string"}
PIPELINE_ROOT_PATH = BUCKET_URI + "/pipeline_root"

STAGING_BUCKET = os.path.join(BUCKET_URI, "temporal")
MODEL_BUCKET = os.path.join(STAGING_BUCKET, "keras_yolov8")

In [3]:

TRAIN_MACHINE_TYPE = "n1-highmem-16"
TRAIN_ACCELERATOR_TYPE = "NVIDIA_TESLA_V100"
TRAIN_NUM_GPU = 2
TRAIN_CONTAINER_URI = (
    "us-docker.pkg.dev/vertex-ai/vertex-vision-model-garden-dockers/keras-yolov8-train"
)
TRAINING_JOB_PREFIX = "train_yolov8"
MODEL_DISPLAY_NAME = "yolov8_fine_tuned"

UPLOAD_JOB_PREFIX = "upload_yolov8"
DEPLOY_JOB_PREFIX = "deploy_yolov8"
SERVING_CONTAINER_URI = (
    "us-docker.pkg.dev/vertex-ai-restricted/prediction/tf_opt-gpu.2-12:latest"
)
SERVING_ACCELERATOR_TYPE = "NVIDIA_TESLA_T4"
SERVING_MACHINE_TYPE = "n1-standard-4"
SERVING_CONTAINER_ARGS = ["--allow_precompilation", "--allow_compression"]

RESOLUTION = 512

def get_job_name_with_datetime(prefix: str):
    """Generates a job name with date time when triggering training or deployment
    jobs in Vertex AI.
    """
    return prefix + datetime.now().strftime("_%Y%m%d_%H%M%S")

In [23]:
train_job_name = get_job_name_with_datetime(TRAINING_JOB_PREFIX)
model_dir = os.path.join(MODEL_BUCKET, train_job_name)

input_csv_path:str= "gs://cloud-samples-data/vision/salads.csv"
epochs: int = 10
learning_rate: float = 0.0005
fpn_depth: int  = 3
confidence_threshold: float = 0.02
iou_threshold: float = 0.3
backbone:str = "yolo_v8_xl_backbone_coco"

worker_pool_specs = [
        {
            "machine_spec": {
                "machine_type": TRAIN_MACHINE_TYPE,
                "accelerator_type": TRAIN_ACCELERATOR_TYPE,
                "accelerator_count": TRAIN_NUM_GPU,
            },
            "replica_count": 1,
            "disk_spec": {
                "boot_disk_type": "pd-ssd",
                "boot_disk_size_gb": 500,
            },
            "container_spec": {
                "image_uri": TRAIN_CONTAINER_URI,
                "command": [],
                "env": [
                    {
                        "name": "RESOLUTION",
                        "value": f"{RESOLUTION}",
                    },
                ],
                "args": [
                    f"--input_csv_path={input_csv_path}",
                    f"--output_model_dir={model_dir}",
                    f"--epochs={epochs}",
                    f"--pretrained_backbone={backbone}",
                    f"--fpn_depth={fpn_depth}",
                    f"--learning_rate={learning_rate}",
                    f"--confidence_threshold={confidence_threshold}",
                    f"--iou_threshold={iou_threshold}",
                ],
            },
        }
        ]

'gs://donuts-dev-usc1/temporal/keras_yolov8/train_yolov8_20240424_185924'

In [35]:
@kfp.dsl.pipeline(
    name="yolov8_finetuning"
)
def pipeline(
    project: str = PROJECT_ID,
    model_display_name: str = MODEL_DISPLAY_NAME,
    model_dir: str = model_dir,
    gpu_count: int = TRAIN_NUM_GPU,
    worker_pool_specs: list = worker_pool_specs,
    skip_training: bool = False
):
    
   
    
    from google_cloud_pipeline_components.types import artifact_types
    from google_cloud_pipeline_components.v1.custom_job import \
        CustomTrainingJobOp
    from google_cloud_pipeline_components.v1.model import ModelUploadOp
    from kfp.dsl import importer_node


    with kfp.dsl.If(skip_training == True) as skip_training:
        
        import_unmanaged_model_task = importer_node.importer(
        artifact_uri=model_dir,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": SERVING_CONTAINER_URI,
            },
        },
        )
        
        model_upload_op = ModelUploadOp(
        project=project,
        display_name=model_display_name,
        unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"]
        )
        
    with kfp.dsl.Else() as run_training:
        
        fine_tuning_op = CustomTrainingJobOp(
            project=project,
            display_name=get_job_name_with_datetime(TRAINING_JOB_PREFIX),
            worker_pool_specs=worker_pool_specs

        )
        
        import_unmanaged_model_task = importer_node.importer(
        artifact_uri=model_dir,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": SERVING_CONTAINER_URI,
            },
        },
        ).after(fine_tuning_op)
    
        model_upload_op = ModelUploadOp(
        project=project,
        display_name=model_display_name,
        unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"]
        )
    
    
        # TODO: run evaluation
        
        # TODO: deploy model if it fits certain criteria
    


In [36]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="yolov8_finetune_pipeline.json",
)

In [39]:
DISPLAY_NAME = get_job_name_with_datetime("yolov8_finetuning")

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="yolov8_finetune_pipeline.json",
    pipeline_root=PIPELINE_ROOT_PATH,
    parameter_values={"model_dir":"gs://donuts-dev-usc1/temporal/keras_yolov8/train_yolov8_20240424_132734","skip_training":True},
    enable_caching=True,
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/633265597134/locations/us-central1/pipelineJobs/yolov8-finetuning-20240424192521
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/633265597134/locations/us-central1/pipelineJobs/yolov8-finetuning-20240424192521')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/yolov8-finetuning-20240424192521?project=633265597134
PipelineJob projects/633265597134/locations/us-central1/pipelineJobs/yolov8-finetuning-20240424192521 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/633265597134/locations/us-central1/pipelineJobs/yolov8-finetuning-20240424192521 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/633265597134/locations/us-central1/pipelineJobs/yolov8-finetuning-20240424192521 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/633265597134/locations/us-central1/pip