In [1]:
from google.cloud import aiplatform
import os

In [2]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS']= "L:\\gcp-practice-0123-bd3866a21b22.json"

In [3]:
PROJECT_ID                 = 'gcp-practice-0123'
REGION                     = 'us-central1'
MACHINE_TYPE               = 'e2-standard-4'
BUCKET                     = 'gcp-practice-0123-18jun2023'
BUCKET_URI                 = 'gcp-practice-0123-18jun2023/custom-trained-model/model/'

In [4]:
aiplatform.init(project        = PROJECT_ID,
                location       = REGION,
                staging_bucket = BUCKET_URI)

In [5]:
import kfp
from kfp.v2 import dsl,compiler
from kfp.v2.dsl import component,pipeline,Output,Artifact,Input,Model

CUSTOM_SERVING_CONTAINER  = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"

@component(base_image            = CUSTOM_SERVING_CONTAINER,
           #output_component_file = 'gs://gcp-practice-0123-18jun2023/docker-kfp-test/yaml/create_endpoint.yaml',
           #output_component_file = '/home/jupyter/KFP-json2/yaml/create_endpoint.yaml',
           output_component_file = 'E:\\Python-code\\VertexAI-MLPipelines\\yaml\\create_endpoint.yaml',
           packages_to_install   = ['google-cloud-aiplatform','protobuf==3.20.3']
          )
def create_endpoint(endpoint_display_name_in:str, 
                    project_in:str
                   ) -> str:
    print("\n\n***Creating Endpoint....")
    from google.cloud import aiplatform
    aiplatform.init(project=project_in)
    endpoint = aiplatform.Endpoint.create(project      = project_in,
                                          display_name = endpoint_display_name_in
                                         )
    print("***The End Point Project is : ",endpoint.project)
    print("***The Private End point resource name is : ",endpoint.resource_name)
    #vertex_endpoint.uri = endpoint.resource_name
    return endpoint.resource_name

In [6]:
import kfp
from kfp.v2 import dsl,compiler
from kfp.v2.dsl import component,pipeline,Output,Artifact,Input,Model


CUSTOM_SERVING_CONTAINER  = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
@component(base_image            = CUSTOM_SERVING_CONTAINER,
           #output_component_file = 'gs://gcp-practice-0123-18jun2023/docker-kfp-test/yaml/deploy_model.yaml'
           #output_component_file = '/home/jupyter/KFP-json2/yaml/deploy_model.yaml'
           output_component_file = 'E:\\Python-code\\VertexAI-MLPipelines\\yaml\\deploy_model.yaml',
           packages_to_install   = ['google-cloud-aiplatform','protobuf==3.20.3']
          )
def deploy_model(project_in:str,
                 model_display_name_in : str,
                 model_resource_nm_in:str,
                 endpoint_in : str
                ):
    print("\n\n***Deploying Model....")
    from google.cloud import aiplatform
    aiplatform.init(project=project_in)
    model_obj = aiplatform.Model(model_resource_nm_in)
    endpoin_obj = aiplatform.Endpoint(endpoint_in)
    deployed_model = model_obj.deploy(endpoint                    = endpoin_obj,
                                      deployed_model_display_name = model_display_name_in,
                                      min_replica_count           = 1,
                                      max_replica_count           = 1,
                                      traffic_split               = {"0":100}
                                     )

In [7]:
import kfp
from kfp.v2 import dsl,compiler
from kfp.v2.dsl import component,pipeline,Output,Artifact,Input,Model

CUSTOM_SERVING_CONTAINER  = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"


@component(base_image            = CUSTOM_SERVING_CONTAINER,
           #output_component_file = 'gs://gcp-practice-0123-18jun2023/docker-kfp-test/yaml/register_model.yaml'
           #output_component_file = '/home/jupyter/KFP-json2/yaml/register_model.yaml'
           output_component_file = 'E:\\Python-code\\VertexAI-MLPipelines\\yaml\\register_model.yaml',
           packages_to_install   = ['google-cloud-aiplatform','protobuf==3.20.3']
          )
def register_model(project_in:str,
                    model_display_name_in : str,
                    model_gcs_path_in : str,
                    serving_container_in : str
                  ) -> str:
    print("\n\n***Registering Model....")
    from google.cloud import aiplatform
    aiplatform.init(project=project_in)
    my_model = aiplatform.Model.upload( project                     = project_in,
                                        display_name                = model_display_name_in,
                                        serving_container_image_uri = serving_container_in,
                                        artifact_uri                = model_gcs_path_in
                                      )
    print("***Model Resource Name is : ", my_model.resource_name)
    return my_model.resource_name

In [8]:
import kfp
from kfp.v2 import dsl,compiler
from kfp.v2.dsl import component,pipeline,Output,Artifact,Input,Model

CUSTOM_SERVING_CONTAINER  = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
@component(base_image            = CUSTOM_SERVING_CONTAINER,
           #output_component_file = 'gs://gcp-practice-0123-18jun2023/docker-kfp-test/yaml/batch_prediction.yaml'
           #output_component_file = '/home/jupyter/KFP-json2/yaml/batch_prediction.yaml'
           output_component_file = 'E:\\Python-code\\VertexAI-MLPipelines\\yaml\\batch_prediction.yaml',
           packages_to_install   = ['google-cloud-aiplatform','protobuf==3.20.3','datetime']
          )
def batch_prediction( project_in:str,
                      model_display_name_in:str,
                      model_resource_nm_in:str,
                      bigquery_source_in:str,
                      bigquery_destination_prefix_in:str,
                      predictions_format_in:str,
                      machine_type_in:str
                    ) -> str:
    print("\n\nDoing Batch Prediction...")
    from google.cloud import aiplatform
    from datetime import datetime
    aiplatform.init(project=project_in)
    model_obj = aiplatform.Model(model_resource_nm_in)
    bigquery_source_in = 'bq://'+bigquery_source_in
    bigquery_destination_prefix_in = bigquery_destination_prefix_in+'-'+datetime.now().strftime('%Y%m%d%H%M%S')
    print('******bigquery_source_in : ',bigquery_source_in)
    print('******bigquery_destination_prefix_in : ',bigquery_destination_prefix_in)
    batch_predict_job = model_obj.batch_predict( job_display_name            = model_display_name_in,
                                                 bigquery_source             = bigquery_source_in,
                                                 bigquery_destination_prefix = bigquery_destination_prefix_in,
                                                 predictions_format          = predictions_format_in,
                                                 machine_type                = machine_type_in
                                               )
    bigquery_destination_prefix_in = bigquery_destination_prefix_in[5:]
    return bigquery_destination_prefix_in

In [9]:
import kfp
from kfp.v2 import dsl,compiler
from kfp.v2.dsl import component,pipeline,Output,Artifact,Input,Model

CUSTOM_SERVING_CONTAINER  = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
@component(base_image            = CUSTOM_SERVING_CONTAINER,
           #output_component_file = 'gs://gcp-practice-0123-18jun2023/docker-kfp-test/yaml/loadBigQueryTableFromGCS.yaml'
           #output_component_file = '/home/jupyter/KFP-json2/yaml/loadBigQueryTableFromGCS.yaml'
           output_component_file = 'E:\\Python-code\\VertexAI-MLPipelines\\yaml\\loadBigQueryTableFromGCS.yaml',
           packages_to_install   = ['google-cloud-aiplatform','google-cloud-bigquery','protobuf==3.20.3','datetime']
          )
def loadBigQueryTableFromGCS(project_in:str,
                             bucket_uri_in:str)-> str:
    from google.cloud import bigquery
    from datetime import datetime
    client = bigquery.Client(project=project_in,location='us-central1')
    job_config = bigquery.LoadJobConfig(
                                    schema=[
                                        bigquery.SchemaField("longitude",        "FLOAT"),
                                        bigquery.SchemaField("latitude",         "FLOAT"),
                                        bigquery.SchemaField("housing_median_age",  "FLOAT"),
                                        bigquery.SchemaField("total_rooms",      "FLOAT"),
                                        bigquery.SchemaField("total_bedrooms",   "FLOAT"),
                                        bigquery.SchemaField("population",       "FLOAT"),
                                        bigquery.SchemaField("households",       "FLOAT"),
                                        bigquery.SchemaField("median_income",    "FLOAT"),
                                        bigquery.SchemaField("median_house_value",      "FLOAT")
                                    ],
                                    skip_leading_rows = 1,
                                    # The source format defaults to CSV, so the line below is optional.
                                    source_format = bigquery.SourceFormat.CSV,
                                )
    uri      = bucket_uri_in+'/california_housing_train.csv'
    table_id = 'gcp-practice-0123.dataset2.california_housing_test_1_'+datetime.now().strftime('%Y%m%d%H%M%S')
    load_job = client.load_table_from_uri(source_uris = uri,
                                          destination = table_id,
                                          job_config  = job_config)  # Make an API request.
    load_job.result()
    return table_id

In [10]:
import kfp
from kfp.v2 import dsl,compiler
from kfp.v2.dsl import component,pipeline,Output,Artifact,Input,Model

CUSTOM_SERVING_CONTAINER  = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
@component(base_image            = CUSTOM_SERVING_CONTAINER,
           #output_component_file = 'gs://gcp-practice-0123-18jun2023/docker-kfp-test/yaml/preprocessBigQueryTable.yaml'
           #output_component_file = '/home/jupyter/KFP-json2/yaml/preprocessBigQueryTable.yaml'
           output_component_file = 'E:\\Python-code\\VertexAI-MLPipelines\\yaml\\preprocessBigQueryTable.yaml',
           packages_to_install   = ['google-cloud-aiplatform','google-cloud-bigquery','protobuf==3.20.3','datetime']
          )
def preprocessBigQueryTable(project_in:str,
                            source_bq_table_id_in:str,)-> str:
    from google.cloud import bigquery
    from datetime import datetime
    client      = bigquery.Client(project=project_in,location='us-central1')
    target_table_id = 'gcp-practice-0123.dataset2.california_housing_test_2_'+datetime.now().strftime('%Y%m%d%H%M%S')
    query_str = "CREATE TABLE "+target_table_id+" AS SELECT longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income FROM "+source_bq_table_id_in 
    print('*******Query is : ',query_str)
    bigquery_query_job = client.query(query=query_str)
    print('*******The state of the job is : ',bigquery_query_job.state)
    return target_table_id

In [11]:
import kfp
from kfp.v2 import dsl,compiler
from kfp.v2.dsl import component,pipeline,Output,Artifact,Input,Model

CUSTOM_SERVING_CONTAINER  = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
@component(base_image            = CUSTOM_SERVING_CONTAINER,
           #output_component_file = 'gs://gcp-practice-0123-18jun2023/docker-kfp-test/yaml/loadScoresFromBigQueryTableToGCS.yaml'
           #output_component_file = '/home/jupyter/KFP-json2/yaml/loadScoresFromBigQueryTableToGCS.yaml'
           output_component_file = 'E:\\Python-code\\VertexAI-MLPipelines\\yaml\\loadScoresFromBigQueryTableToGCS.yaml',
           packages_to_install   = ['google-cloud-aiplatform','google-cloud-bigquery','protobuf==3.20.3']
          )
def loadScoresFromBigQueryTableToGCS(project_in:str,
                                     source_bq_table_id_in:str,
                                     target_bucket_uri_in:str)-> str:
    from google.cloud import bigquery
    client      = bigquery.Client(project=project_in,location='us-central1')
    #dataset_ref = bigquery.DatasetReference(project_in, dataset_id_in)
    #table_ref   = dataset_ref.table(source_bq_table_id_in)
    extract_job = client.extract_table(source           = source_bq_table_id_in,
                                       destination_uris = target_bucket_uri_in
                                      )
    extract_job.result()

In [12]:
import kfp
from kfp.v2 import dsl,compiler
from kfp.v2.dsl import component,pipeline,Output,Artifact,Input,Model

#pipeline_root_path          = 'gs://gcp-practice-0123-18jun2023/docker-kfp-test/kfp-json/'
pipeline_root_path          = "E:\\Python-code\\VertexAI-MLPipelines\\json\\"
@dsl.pipeline(name          = "mlopsdeploypipeline-27jan2023",
              pipeline_root = pipeline_root_path
             )
def mlopsDeployPipeline(project_in:str,
                        endpoint_display_name_in:str,
                        model_gcs_path_in:str,
                        serving_container_in:str,
                        #bigquery_source_in:str,
                        bigquery_destination_prefix_in:str,
                        predictions_format_in:str,
                        machine_type_in:str,
                        bucket_uri_in:str,
                        score_target_bucket_uri_in:str
                       ):
    #from create_endpoint import create_endpoint
    #from upload_model import register_model
    #from deploy_model import deploy_model
    #from batch_prediction import batch_prediction
    #from loadBigQueryTableFromGCS import loadBigQueryTableFromGCS
    
    endpoint_op = create_endpoint(endpoint_display_name_in,
                                  project_in
                                 )
    
    model_op = register_model(project_in,
                              endpoint_display_name_in,
                              model_gcs_path_in,
                              serving_container_in
                             )

    deploy_model(project_in,
                 endpoint_display_name_in,
                 model_op.output,
                 endpoint_op.output)
    
    bq_table_id = loadBigQueryTableFromGCS(project_in,
                                           bucket_uri_in
                                          )
    
    scoring_data_bq_tbl = preprocessBigQueryTable(project_in,bq_table_id.output)
    #batch_prediction(project_in,endpoint_display_name_in,model_op.output,bigquery_source_in,bigquery_destination_prefix_in,predictions_format_in,machine_type_in)
    score_bq_tbl = batch_prediction( project_in,
                                     endpoint_display_name_in,
                                     model_op.output,
                                     scoring_data_bq_tbl.output,
                                     bigquery_destination_prefix_in,
                                     predictions_format_in,
                                     machine_type_in
                                   )
    
    loadScoresFromBigQueryTableToGCS(project_in,
                                     score_bq_tbl.output,
                                     score_target_bucket_uri_in
                                    )

In [13]:
import kfp
from kfp.v2 import dsl,compiler
from kfp.v2.dsl import component,pipeline,Output,Artifact,Input,Model

#from make_pipeline import mlopsDeployPipeline
JSON_LOCAL_PATH = "E:\\Python-code\\VertexAI-MLPipelines\\json\\mlopsdeploypipeline-1.json"
JSON_GCS_PATH   = 'gs://gcp-practice-0123-18jun2023/docker-kfp-test/kfp-json/mlopsdeploypipeline-1.json'
compiler.Compiler().compile(pipeline_func = mlopsDeployPipeline,
                            #package_path  = 'gs://gcp-practice-0123-18jun2023/docker-kfp-test/kfp-json/mlopsdeploypipeline-1.json'
                            package_path  = JSON_LOCAL_PATH
                           )



In [None]:
#TRANSFER THE JSON ARTIFACT FROM LOCAL TO GCS BUCKET

In [14]:
from google.cloud import aiplatform,storage

storage_client = storage.Client(project=PROJECT_ID)
bucket = storage_client.bucket(bucket_name=BUCKET)
blob   = bucket.blob(blob_name='docker-kfp-test/kfp-json/mlopsdeploypipeline-1.json')
blob.upload_from_filename(filename='E:\\Python-code\\VertexAI-MLPipelines\\json\\mlopsdeploypipeline-1.json')

In [None]:
#EXECUTE PIPELINE

In [15]:
from google.cloud import aiplatform

PROJECT_ID                 = 'gcp-practice-0123'
PREBUILT_SERVING_CONTAINER = "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
pipeline_root_path         = 'gs://gcp-practice-0123-18jun2023/test-kfp/kfp/'

In [16]:
mlops_pipeline_job = aiplatform.PipelineJob(display_name    = "model-deployment-pipeline-test-json",
                                            pipeline_root   = pipeline_root_path,
                                            template_path   = "gs://gcp-practice-0123-18jun2023/docker-kfp-test/kfp-json/mlopsdeploypipeline-1.json",
                                            #template_path   =  "E:\\Python-code\\vertex ai notes\\json\\mlopsdeploypipeline-1.json"
                                            project         = PROJECT_ID,
                                            enable_caching  = False,
                                            parameter_values={"project_in":PROJECT_ID,
                                                              "endpoint_display_name_in":"mlopsDeployPipeline-27jan2023",
                                                              "model_gcs_path_in":'gs://gcp-practice-0123-18jun2023/custom-trained-model/model/aiplatform-custom-training-2023-01-22-18:07:17.227/model/',
                                                              "serving_container_in":PREBUILT_SERVING_CONTAINER,
                                                              #"bigquery_source_in":"bq://gcp-practice-0123.dataset2.california_housing_test2",
                                                              "bigquery_destination_prefix_in":'bq://gcp-practice-0123.dataset2.kfp-predictions',
                                                              "predictions_format_in":"bigquery",
                                                              "machine_type_in":"e2-standard-4",
                                                              "bucket_uri_in":"gs://gcp-practice-0123-18jun2023/custom-trained-model/training-data",
                                                              "score_target_bucket_uri_in": "gs://gcp-practice-0123-18jun2023/docker-kfp-test/kfp-score/kfp-scores-28jan2023.csv"
                                                             }
                                            
                                          )

#mlops_pipeline_job.submit(service_account='sa-nonprod-corp-1cdh-214e-01@nonprod-corp-1cdh-214e.iam.gserviceaccount.com')
mlops_pipeline_job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/553411498874/locations/us-central1/pipelineJobs/mlopsdeploypipeline-27jan2023-20230128164135
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/553411498874/locations/us-central1/pipelineJobs/mlopsdeploypipeline-27jan2023-20230128164135')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/mlopsdeploypipeline-27jan2023-20230128164135?project=553411498874


In [14]:
#!pip list

Package                       Version
----------------------------- --------------------
absl-py                       0.11.0
aiohttp                       3.8.3
aiosignal                     1.3.1
alabaster                     0.7.12
alembic                       1.9.1
anaconda-client               1.11.0
anaconda-navigator            2.3.1
anaconda-project              0.11.1
anyio                         3.5.0
appdirs                       1.4.4
argon2-cffi                   21.3.0
argon2-cffi-bindings          21.2.0
arrow                         1.2.2
astroid                       2.11.7
astropy                       5.1
astunparse                    1.6.3
async-timeout                 4.0.2
atomicwrites                  1.4.0
attrs                         21.4.0
Automat                       20.2.0
autopep8                      1.6.0
Babel                         2.9.1
backcall                      0.2.0
backports.functools-lru-cache 1.6.4
backports.tempfile            1.0
backpo

pathspec                      0.9.0
patsy                         0.5.2
pep8                          1.7.1
pexpect                       4.8.0
pickleshare                   0.7.5
Pillow                        9.2.0
pip                           22.2.2
pkginfo                       1.8.2
platformdirs                  2.5.2
plotly                        5.9.0
pluggy                        1.0.0
poyo                          0.5.0
prometheus-client             0.14.1
prompt-toolkit                3.0.20
Protego                       0.1.16
proto-plus                    1.22.1
protobuf                      3.20.3
psutil                        5.9.0
ptyprocess                    0.7.0
py                            1.11.0
pyarrow                       10.0.1
pyasn1                        0.4.8
pyasn1-modules                0.2.8
pycodestyle                   2.8.0
pycosat                       0.6.3
pycparser                     2.21
pyct                          0.4.8
pycurl               