# Create and run a pipeline in Vertex AI

In [None]:
%load_ext autoreload
%autoreload 2

TFX Version: 1.12.0  
Tensorflow Version: 2.11.0

In [None]:
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google.cloud import storage
import tensorflow as tf
from tfx import v1 as tfx
from tfx.proto import example_gen_pb2
from tfx.extensions.google_cloud_big_query.example_gen.component import BigQueryExampleGen
from tfx.dsl.components.common import resolver
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.dsl.experimental import latest_blessed_model_resolver
import tensorflow_model_analysis as tfma
from typing import List, Optional
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner
import tensorflow_transform as tft
import pandas as pd
import os
import logging
logging.getLogger().setLevel(logging.INFO)
print("TFX Version:", tfx.__version__)
print("Tensorflow Version:", tf.__version__)

In [None]:
import sys
sys.path.append("../..")

In [None]:
from src.common import utils

<b>Update variables in config.yml before running the next cell</b>

In [3]:
config = utils.load_config()

Project ID: strategy-bi-ltd
Region: europe-west1
Bucket name: mlops-mlops
Service Account: mlops-compute@strategy-bi-ltd.iam.gserviceaccount.com
Vertex API Parent URI: projects/strategy-bi-ltd/locations/europe-west1
Version: v04
Dataset display name: baseline
Model display name: baseline-classifier-v04
Workspace: gs://mlops-mlops/baseline
Raw schema dir: raw_schema
Module file local path: model.py
Module file GCP path: gs://mlops-mlops/baseline/pipeline_module/baseline-classifier-v04-train-pipeline/pipeline_root/baseline-classifier-v04-train-pipeline/model.py
MLMD sqllite: mlmd.sqllite
Artifact store: gs://mlops-mlops/baseline\tfx_artifacts_interactive
Model registry: gs://mlops-mlops/baseline\model_registry
Pipeline name: baseline-classifier-v04-train-pipeline
Pipeline root: gs://mlops-mlops/baseline\tfx_artifacts_interactive\baseline-classifier-v04-train-pipeline
Pipeline definition file: baseline.json


<b>Copy module file to GCP bucket.</b>

In [None]:
!gsutil cp {config["MODULE_FILE"]} {config["MODULE_PATH"]}

In [None]:
utils.remove_previous_artefacts(config)

<b>Initialise an AIPlatfrom client</b>

In [None]:
# aiplatform.init(project=config["PROJECT"], location=config["REGION"])
context = utils.create_interactive_context(config)

<b>Declare the source query</b>

In [None]:
sql_query = "SELECT * FROM `strategy-bi-ltd.ml.training_baseline_frames_merged3` WHERE rn <= 130500"

<b>Create components functions<b>Initialise an AIPlatfrom client</b> </b>

This function creates BigQueryExampleGen component

In [None]:
def _get_bigquery_example_gen(sql_query):
    output_config = example_gen_pb2.Output(
        split_config=example_gen_pb2.SplitConfig(
            splits=[
                example_gen_pb2.SplitConfig.Split(name="train", hash_buckets=9),
                example_gen_pb2.SplitConfig.Split(name="eval", hash_buckets=1),
            ]
        )
    )
    return BigQueryExampleGen(query=sql_query, output_config=output_config, custom_config=example_gen_pb2.CustomConfig())


This function creates ExampleGen component

In [None]:
example_gen = _get_bigquery_example_gen(sql_query)

In [None]:
context.run(example_gen, beam_pipeline_args=config['BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS'], enable_cache=False)

In [None]:
test_sql_query = "SELECT * FROM `uk-gap-proximity-dev.ml.training_baseline_frames_merged3` WHERE rn > 130500"

In [None]:
output_config = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(
        splits=[
            example_gen_pb2.SplitConfig.Split(name="test", hash_buckets=1),
        ]
    )
)

test_example_gen = BigQueryExampleGen(query=test_sql_query, output_config=output_config, custom_config=example_gen_pb2.CustomConfig())

beam_pipeline_args=[
    f"--project={config['PROJECT']}",
    f"--temp_location={os.path.join(config['WORKSPACE'], 'tmp')}"
]

context.run(
    test_example_gen,
    beam_pipeline_args=beam_pipeline_args,
    enable_cache=False
)

This function creates StatisticsGen component

In [None]:
def _get_statistics_gen(example_gen):
    return tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])

In [None]:
statistics_gen = _get_statistics_gen(example_gen)
context.run(statistics_gen)

This function creates SchemaGen component

In [None]:
def _get_schema_gen(statistics_gen):
    return tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

In [None]:
schema_gen = _get_schema_gen(statistics_gen)
context.run(schema_gen)

This function creates ExampleValidator component

In [None]:
def _get_example_validator(statistics_gen, schema_gen):
    return tfx.components.ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_gen.outputs['schema'])

In [None]:
example_validator = _get_example_validator(statistics_gen, schema_gen)
context.run(example_validator)

In [None]:
context.show(example_validator.outputs['anomalies'])

This function creates Transform component

In [None]:
def _get_transform(example_gen, schema_gen, module_file):
    return tfx.components.Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        module_file=module_file)

In [None]:
transform = _get_transform(example_gen, schema_gen, config['MODULE_FILE'])
context.run(transform, enable_cache=False)

This function creates Trainer component

In [None]:
def _get_trainer(module_file, transform, schema_gen):
    return tfx.components.Trainer(
        module_file=module_file,
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        train_args=trainer_pb2.TrainArgs(num_steps=10000),
        eval_args=trainer_pb2.EvalArgs(num_steps=5000),
        custom_config={'labels_path': 'labels_path'})

In [None]:
trainer = _get_trainer(config['MODULE_FILE'], transform, schema_gen)
context.run(trainer, enable_cache=False)

This function creates Resolver component

In [None]:
def _get_model_resolver():
    return resolver.Resolver(
        strategy_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
        model=Channel(type=Model),
        model_blessing=Channel(type=ModelBlessing)).with_id('latest_blessed_model_resolver')

In [None]:
latest_model_resolver = _get_model_resolver()
context.run(latest_model_resolver, enable_cache=False)

This function creates Evaluator component

In [None]:
def _get_evaluator(test_example_gen, trainer, model_resolver, accuracy_threshold):
    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(label_key=config['LABEL'], signature_name='serving_default')],
        slicing_specs=[tfma.SlicingSpec()],
        metrics_specs=[
            tfma.MetricsSpec(metrics=[
                tfma.MetricConfig(
                    class_name='BinaryAccuracy',
                    threshold=tfma.MetricThreshold(
                        value_threshold=tfma.GenericValueThreshold(
                            lower_bound={'value': accuracy_threshold}),
                        change_threshold=tfma.GenericChangeThreshold(
                            direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                            absolute={'value': -1e-3})))
                ])
            ])
    return tfx.components.Evaluator(
        examples=test_example_gen.outputs['examples'],
        example_splits=['test'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=eval_config,
        schema=schema_gen.outputs['schema'])

In [None]:
evaluator = _get_evaluator(test_example_gen, trainer, latest_model_resolver, accuracy_threshold=0.5)
context.run(evaluator, enable_cache=False)

In [None]:
evaluation_results = evaluator.outputs['evaluation'].get()[0].uri
print("validation_ok:", tfma.load_validation_result(evaluation_results).validation_ok, '\n')

for entry in list(tfma.load_metrics(evaluation_results))[0].metric_keys_and_values:
    value = entry.value.double_value.value
    if value:
        print(entry.key.name, ":", round(entry.value.double_value.value, 3))

In [None]:
!gsutil ls gs://mlops-mlops/baseline/tfx_artifacts_interactive/baseline-classifier-v04-train-pipeline/Trainer/model/10/Format-Serving

In [None]:
!gsutil cp -r gs://mlops-mlops/baseline/tfx_artifacts_interactive/baseline-classifier-v04-train-pipeline/Trainer/model/10/Format-Serving/* gs://sky-mlops-dev/model-000/v01

In [None]:
!gsutil cp -r gs://mlops-mlops/baseline/tfx_artifacts_interactive/baseline-classifier-v04-train-pipeline/Trainer/model/10/Format-Serving/* gs://sky-mlops-dev/model-000/live

This function creates Pusher component

In [None]:
def _get_pusher(trainer, evaluator, serving_model_dir):
    return tfx.components.Pusher(
        model=trainer.outputs['model'],
        model_blessing=evaluator.outputs['blessing'],
        push_destination=pusher_pb2.PushDestination(
            filesystem=pusher_pb2.PushDestination.Filesystem(
                base_directory=serving_model_dir)))

This function creates pipeline

In [None]:
def _create_pipeline(pipeline_name: str, 
                     pipeline_root: str, 
                     module_file: str, 
                     serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     sql_query: str,
                     test_sql_query: str,
                     accuracy_threshold: float = 0.35
                     ) -> tfx.dsl.Pipeline:
    
    # example_gen = _get_example_gen()
    example_gen = _get_bigquery_example_gen(sql_query)
    test_example_gen = _get_bigquery_example_gen(test_sql_query)
    statistics_gen = _get_statistics_gen(example_gen)
    schema_gen = _get_schema_gen(statistics_gen)
    example_validator = _get_example_validator(statistics_gen, schema_gen)
    transform = _get_transform(example_gen, schema_gen, module_file)
    trainer = _get_trainer(module_file, transform, schema_gen)
    model_resolver = _get_model_resolver()
    evaluator = _get_evaluator(test_example_gen, trainer, model_resolver, accuracy_threshold)
    pusher = _get_pusher(trainer, evaluator, serving_model_dir)
    
    components = [
        example_gen, 
        statistics_gen, schema_gen, example_validator, transform,
        trainer, model_resolver, evaluator, pusher
    ]
    
    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components,
        beam_pipeline_args=beam_pipeline_args)

This cell creates a runner and generates a pipeline

In [None]:
runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config=kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(),
    output_filename=config["PIPELINE_DEFINITION_FILE"])
_ = runner.run(
    _create_pipeline(
        pipeline_name=config["PIPELINE_NAME"],
        pipeline_root=config["PIPELINE_ROOT"],
        module_file=config["MODULE_PATH"],
        serving_model_dir=config["SERVING_MODEL_DIR"],
        beam_pipeline_args=config["BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS"],
        sql_query = sql_query))

This cell submits the generated pipeline to Vertex

In [None]:
job = pipeline_jobs.PipelineJob(template_path=config["PIPELINE_DEFINITION_FILE"], display_name=config["PIPELINE_NAME"],
                                project=config["PROJECT"], location=config["REGION"])
job.submit()

In [None]:
mpath = 'gs://mlops-mlops/baseline/tfx_artifacts_interactive/baseline-classifier-v03-train-pipeline/Trainer/model/10/Format-Serving/'

In [None]:
model = tf.saved_model.load(mpath)

In [None]:
data = "Mozilla/5.0 (Linux; x86_64 GNU/Linux) AppleWebKit/601.1 (KHTML, like Gecko) Version/8.0 Safari/601.1 WPE FOG/3.0.0"

In [None]:
data = ["GB","isp",233.0,1.095,5.1641688843363749,10.365767746714194,np.nan,2,2,256,1.19,210,1.0,466,2.19,"Mozilla/5.0 (Linux; x86_64 GNU/Linux) AppleWebKit/601.1 (KHTML, like Gecko) Version/8.0 Safari/601.1 WPE FOG/3.0.0"]

In [None]:
data = {
  "avg_turnaround_time": [[10.365767746714194]],
  "avg_transfer_time": [[5.1641688843363749]],
  "min_total_MB_consumed": [[1.0]],
  "max_total_MB_consumed": [[1.19]],
  "avg_total_MB_consumed": [[1.095]],
  "sum_total_MB_consumed": [[2.19]],
  "avg_cnt_request_id": [[233.0]],
  "cnt_total_MB_consumed": [[2]],
  "min_cnt_request_id": [[210]],
  "max_cnt_request_id": [[256]],
  "cnt_cnt_request_id": [[2]],
  "sum_cnt_request_id": [[466]],
  "user_agent2": [["Mozilla/5.0 (Linux; x86_64 GNU/Linux) AppleWebKit/601.1 (KHTML, like Gecko) Version/8.0 Safari/601.1 WPE FOG/3.0.0"]],
  "asn_country": [["GB"]],
  "asn_type": [["isp"]],
  "channel": [[""]]
}

In [None]:
import json

In [None]:
data1 = json.dumps({"signature_name": "serving_default", "instances": data})

In [None]:
data1

In [None]:
# predictions = model.predict(data)
model.signatures["serving_default"](**data)

In [None]:
!saved_model_cli show --dir gs://mlops-mlops/baseline/tfx_artifacts_interactive/baseline-classifier-v03-train-pipeline/Trainer/model/10/Format-Serving/ --all

In [None]:
infer = model.signatures["serving_default"]

In [None]:
print(infer.structured_outputs)