In [1]:
import os
import time
import logging
import kfp
from google.cloud import bigquery, storage
from google.cloud import aiplatform as vertex_ai
from google_cloud_pipeline_components.experimental.custom_job import utils
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component
from typing import NamedTuple
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, Metrics,
                        OutputPath, component)

from google_cloud_pipeline_components.experimental.custom_job import utils

In [2]:
logging.basicConfig(level=logging.INFO)

## Load Params and Resource Config

In [5]:
from config.gcp_resource import *

In [6]:
if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    
if SERVICE_ACCOUNT == "" or SERVICE_ACCOUNT is None or SERVICE_ACCOUNT == "[your-service-account]":
    # Get your GCP project id from gcloud
    shell_output = !gcloud config list --format 'value(core.account)' 2>/dev/null
    SERVICE_ACCOUNT = shell_output[0]
    
if GCS_BUCKET == "" or GCS_BUCKET is None or GCS_BUCKET == "[your-bucket-name]":
    # Get your bucket name to GCP projet id
    GCS_BUCKET = PROJECT_ID
    # Try to create the bucket if it doesn'exists
    ! gsutil mb -l $REGION gs://$BUCKET
    print("")
    
!gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [7]:
print("Train machine type", TRAIN_COMPUTE)
print("Deploy machine type", DEPLOY_COMPUTE)
print("Deployment:", DEPLOY_IMAGE)
print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
print('MODULE_ROOT: {}'.format(MODULE_ROOT))
print('DATA_ROOT: {}'.format(DATA_ROOT))
print('SERVING_MODEL_DIR: {}'.format(SERVING_MODEL_DIR))

Train machine type n1-standard-4
Deploy machine type n1-standard-4
Deployment: us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest
PIPELINE_ROOT: gs://mle_airbus_dataset/airbusmlepipeline/pipeline_root
MODULE_ROOT: gs://mle_airbus_dataset/airbusmlepipeline/pipeline_module
DATA_ROOT: gs://mle_airbus_dataset/airbusmlepipeline/data
SERVING_MODEL_DIR: gs://mle_airbus_dataset/airbusmlepipeline/serving_model


## Create KFP pipeline

In [26]:
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,
                                                          ModelDeployOp)
from google_cloud_pipeline_components import aiplatform as gcpc_aip
from kfp.v2.components import importer_node

In [27]:
trainer_component = kfp.components.load_component_from_file("./build/tensorflow_airbus.yaml")
gen_train_hist_component = kfp.components.load_component_from_file("./build/gen_train_hist_component.yaml")

FileNotFoundError: [Errno 2] No such file or directory: './build/tensorflow_airbus.yaml'

In [28]:
# Define the pipeline
@dsl.pipeline(
   name='airbus-mle',
   description='Model training pipeline for MLE Project for airbus detection.',
   pipeline_root=PIPELINE_ROOT
)

def airbus_component_deploy_pipeline(
    project_dict: dict, 
    source_url: str,
    gcs_bucket: str,
    metrics_thresholds: dict,
    model_output_folder: str
):
    
    import_task = import_file_component(
        project_dict = project_dict
    ).set_display_name("Import data from BigQuery and run preprocessing")
    
    gen_train_hist_task = gen_train_hist_component(
        project_dict = project_dict
    ).after(import_task).set_display_name("Generate input image statistics")
    
    trainer_path = trainer_component(
        model_dir = 'gs://mle_airbus_dataset/trained_model/',
        train_data_dir = import_task.outputs['train_data_fpath'],
        eval_data_dir = import_task.outputs['test_data_fpath']
    ).set_display_name("Model training").after(import_task)
        
    model_eval_task = model_eval_component(
        test_filepath = import_task.outputs['test_data_fpath'],
        model_filepath = trainer_path.output
    ).set_display_name("Run model evaluation on selected metrics").after(trainer_path)
    
    model_eval_test = model_eval_test_component(
        metrics = model_eval_task.output
    ).set_display_name("Test if model evaluation results passed")

    importer_spec = importer_node.importer(
      artifact_uri= trainer_path.output,
      artifact_class=artifact_types.UnmanagedContainerModel,
      metadata={
          'containerSpec': {
              'imageUri': DEPLOY_IMAGE
          }
      }).after(model_eval_test).set_display_name("Import trained model image")

    model_upload_with_artifact_op = gcpc_aip.ModelUploadOp(
      project=PROJECT_ID,
      location=REGION,
      display_name=MODEL_DISPLAY_NAME,
      unmanaged_container_model=importer_spec.outputs['artifact']).set_display_name("Model upload")

    model_upload_with_artifact_op.after(importer_spec)

    endpoint_create_op = EndpointCreateOp(
        project=PROJECT_ID,
        location=REGION,
        display_name=ENDPOINT_DISPLAY_NAME,
    ).set_display_name("Create end point for deployment")

    model_deploy_op = ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_with_artifact_op.outputs["model"],
        deployed_model_display_name=MODEL_DEPLOY_DISPLAY_NAME,
        dedicated_resources_machine_type=DEPLOY_COMPUTE,
        dedicated_resources_accelerator_type=ACCELERATOR_TYPE,
        dedicated_resources_accelerator_count=1,
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
        traffic_split = {0: 100},
    ).set_display_name("Model deployment and serving")

    test_deployment_task = test_deployment_component(
        endpoint = endpoint_create_op.outputs["endpoint"],
        project_dict = project_dict
    ).after(model_deploy_op).set_display_name("Test model deployment")
        
    return True

## Pipeline Compilation and Submission

In [29]:
from kfp.v2 import compiler  

compiler.Compiler().compile(pipeline_func=airbus_component_deploy_pipeline, package_path="airbusmle_pipeline.json")

params = {'project_dict': project_dict, 
        'source_url': source_url, 
        'gcs_bucket': GCS_BUCKET,
        'metrics_thresholds': metrics_thresholds,
       'model_output_folder': 'default'}

job = aip.PipelineJob(display_name='default', template_path="airbusmle_pipeline.json", pipeline_root=PIPELINE_ROOT, parameter_values=params, enable_caching=False)
job.submit()



NameError: name 'import_file_component' is not defined