Install the dependencies.

In [None]:
# !pip install requirements.txt

In [None]:
!pip install tfx

In [None]:
# If you see this error "cannot import name 'WKBWriter' from 'shapely.geos' when"
!pip install -U google-cloud-aiplatform "shapely<2"

Import Python libraries

In [None]:
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google.cloud import storage
import tensorflow as tf
from tfx import v1 as tfx
from tfx.proto import example_gen_pb2
from tfx.dsl.components.common import resolver
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.dsl.experimental import latest_blessed_model_resolver
import tensorflow_model_analysis as tfma
from typing import List, Optional
import os
import logging
logging.getLogger().setLevel(logging.INFO)

Declare variables

In [2]:
GCS_BUCKET_NAME = 'vekimenko_sky_test'
GOOGLE_CLOUD_REGION = 'europe-west2'
GOOGLE_CLOUD_PROJECT = 'strategy-bi-ltd'
PIPELINE_NAME = 'sky_test'
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
DATA_ROOT = 'gs://{}/data'.format('learn_tfx', PIPELINE_NAME)
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)
MODULE_FILE = 'sky_module.py'
SOURCE_FILE_NAME = './{}'.format(MODULE_FILE)
MODULE_PATH = 'gs://{}/pipeline_root/{}/{}'.format(GCS_BUCKET_NAME,PIPELINE_NAME, MODULE_FILE)
MODULE_PATH_IN_BUCKET = 'pipeline_root/{}/{}'.format(PIPELINE_NAME, MODULE_FILE)
PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
    '--project=' + GOOGLE_CLOUD_PROJECT,
    '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
]

Initialise an AIPlatfrom client 

In [3]:
aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

Create GCP bucket

In [4]:
storage_client = storage.Client()
bucket = storage_client.bucket(GCS_BUCKET_NAME)
bucket.storage_class = "COLDLINE"
new_bucket = storage_client.create_bucket(bucket, location=GOOGLE_CLOUD_REGION)    

Grant admin priveledges for the new bucket to the service account 

In [5]:
policy = bucket.get_iam_policy(requested_policy_version=3)
policy.bindings.append(
    {
        "role": 'roles/storage.admin', 
        "members": {'serviceAccount:792237211962-compute@developer.gserviceaccount.com'}
    })
bucket.set_iam_policy(policy)

Upload the module file to the bucket

In [6]:
blob = bucket.blob(MODULE_PATH_IN_BUCKET)
generation_match_precondition = 0
blob.upload_from_filename(SOURCE_FILE_NAME, if_generation_match=generation_match_precondition)

This function creates ExampleGen component

In [7]:
def _get_example_gen():
    input_config = example_gen_pb2.Input(splits=[
        example_gen_pb2.Input.Split(name='train', pattern='train/*'),
        example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
    ])
    return tfx.components.ImportExampleGen(input_base=DATA_ROOT, input_config=input_config)

This function creates StatisticsGen component

In [8]:
def _get_statistics_gen(example_gen):
    return tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])

This function creates SchemaGen component

In [9]:
def _get_schema_gen(statistics_gen):
    return tfx.components.SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

This function creates ExampleValidator component

In [10]:
def _get_example_validator(statistics_gen, schema_gen):
    return tfx.components.ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_gen.outputs['schema'])

This function creates Transform component

In [11]:
def _get_transform(example_gen, schema_gen, module_file):
    return tfx.components.Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        module_file=module_file)

This function creates Trainer component

In [12]:
def _get_trainer(module_file, transform, schema_gen):
    return tfx.components.Trainer(
        module_file=module_file,
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        train_args=trainer_pb2.TrainArgs(num_steps=10000),
        eval_args=trainer_pb2.EvalArgs(num_steps=5000),
        custom_config={'labels_path': 'labels_path'})

This function creates Resolver component

In [13]:
def _get_model_resolver():
    return resolver.Resolver(
        strategy_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
        model=Channel(type=Model),
        model_blessing=Channel(type=ModelBlessing)).with_id('latest_blessed_model_resolver')

This function creates Evaluator component

In [14]:
def _get_evaluator(transform, trainer, model_resolver, accuracy_threshold):
    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(label_key='label')],
        slicing_specs=[tfma.SlicingSpec()],
        metrics_specs=[
            tfma.MetricsSpec(metrics=[
                tfma.MetricConfig(
                    class_name='SparseCategoricalAccuracy',
                    threshold=tfma.MetricThreshold(
                        value_threshold=tfma.GenericValueThreshold(
                            lower_bound={'value': accuracy_threshold}),
                        change_threshold=tfma.GenericChangeThreshold(
                            direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                            absolute={'value': -1e-3})))
                ])
            ])
    return tfx.components.Evaluator(
        examples=transform.outputs['transformed_examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=eval_config)

This function creates Pusher component

In [15]:
def _get_pusher(trainer, evaluator, serving_model_dir):
    return tfx.components.Pusher(
        model=trainer.outputs['model'],
        model_blessing=evaluator.outputs['blessing'],
        push_destination=pusher_pb2.PushDestination(
            filesystem=pusher_pb2.PushDestination.Filesystem(
                base_directory=serving_model_dir)))

This function creates pipeline

In [21]:
def _create_pipeline(pipeline_name: str, 
                     pipeline_root: str, 
                     module_file: str, 
                     serving_model_dir: str,
                     beam_pipeline_args: Optional[List[str]],
                     accuracy_threshold: float = 0.35
                     ) -> tfx.dsl.Pipeline:
    
    example_gen = _get_example_gen()
    statistics_gen = _get_statistics_gen(example_gen)
    schema_gen = _get_schema_gen(statistics_gen)
    example_validator = _get_example_validator(statistics_gen, schema_gen)
    transform = _get_transform(example_gen, schema_gen, module_file)
    trainer = _get_trainer(module_file, transform, schema_gen)
    model_resolver = _get_model_resolver()
    evaluator = _get_evaluator(transform, trainer, model_resolver, accuracy_threshold)
    pusher = _get_pusher(trainer, evaluator, serving_model_dir)
    
    components = [
        example_gen, statistics_gen, schema_gen, example_validator, transform,
        trainer, model_resolver, evaluator, pusher
    ]
    
    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components,
        beam_pipeline_args=beam_pipeline_args)

This cell creates a runner and generates a pipeline

In [None]:
runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name='sky-test',
        pipeline_root=PIPELINE_ROOT,
        module_file=MODULE_PATH,
        serving_model_dir=SERVING_MODEL_DIR,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS))

This cell submits the generated pipeline to Vertex

In [None]:
job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE, display_name=PIPELINE_NAME)
job.submit()

Once the pipeline is finished and model is saved the following function creates an endpoint.   

In [4]:
def create_endpoint(endpoint_display_name):
    endpoints = aiplatform.Endpoint.list(
        filter=f'display_name={endpoint_display_name}', 
        order_by="update_time")
    
    if len(endpoints) > 0:
        logging.info(f"Endpoint {endpoint_display_name} already exists.")
        endpoint = endpoints[-1]
    else:
        endpoint = aiplatform.Endpoint.create(endpoint_display_name)
    logging.info(f"Endpoint is ready.")
    return endpoint

In [None]:
endpoint_display_name = 'sky-test'
endpoint = create_endpoint(endpoint_display_name)

In [None]:
# endpoint = aiplatform.Endpoint('projects/792237211962/locations/europe-west2/endpoints/1417393633505574912')

This function creates a model

In [5]:
def create_model(model_display_name, artifact_uri, env_uri):
    models = aiplatform.Model.list(
        filter=f'display_name={model_display_name}', 
        order_by="update_time")
    
    if len(models) > 0:
        logging.info(f"Model {model_display_name} already exists.")
        model = models[-1]
    else:
        model = aiplatform.Model.upload(display_name=model_display_name, 
                                        artifact_uri=artifact_uri,
                                        serving_container_image_uri=env_uri)
        
    logging.info("Model is ready.")
    return model

In [39]:
model_display_name = 'sky-cifar10-model'
artifact_uri='gs://vekimenko_sky_test/serving_model/sky_test/1702120393'
env_uri='gcr.io/deeplearning-platform-release/base-cpu:m79'
model = create_model(model_display_name, artifact_uri, env_uri)

In [6]:
# model = aiplatform.Model('projects/792237211962/locations/europe-west2/models/414234408694841344@1')

This cell deploy the model to the endpoint

In [None]:
deployed_model = endpoint.deploy(model=model)