In [21]:
import os
import sys
import logging
import json

import tensorflow_model_analysis as tfma

import tfx
from tfx.proto import example_gen_pb2, transform_pb2, trainer_pb2
from tfx.orchestration import pipeline, data_types
from tfx.dsl.components.base import executor_spec
from tfx.components.trainer import executor as trainer_executor
from tfx.extensions.google_cloud_ai_platform.trainer import (
    executor as ai_platform_trainer_executor,
)
from tfx.extensions.google_cloud_big_query.example_gen.component import (
    BigQueryExampleGen,
)
from tfx.components import StatisticsGen
from tfx.components import (
    StatisticsGen,
    ExampleValidator,
    Transform,
    Trainer,
    Evaluator,
    Pusher,
)
from tfx.dsl.components.common.importer import Importer
from tfx.dsl.components.common.resolver import Resolver
from tfx.dsl.experimental import latest_artifacts_resolver
from tfx.dsl.experimental import latest_blessed_model_resolver

from ml_metadata.proto import metadata_store_pb2

In [22]:


from src.tfx_pipelines import config
from src.tfx_pipelines import components as custom_components
from src.common import features, datasource_utils

#RAW_SCHEMA_DIR = "src/raw_schema"
#TRANSFORM_MODULE_FILE = "src/preprocessing/transformations.py"
#TRAIN_MODULE_FILE = "src/model_training/runner.py"


In [76]:
def create_pipeline(
    pipeline_root: str,
    num_epochs: data_types.RuntimeParameter,
    batch_size: data_types.RuntimeParameter,
    learning_rate: data_types.RuntimeParameter,
    hidden_units: data_types.RuntimeParameter,
    metadata_connection_config: metadata_store_pb2.ConnectionConfig = None,
):

    local_executor_spec = executor_spec.ExecutorClassSpec(
        trainer_executor.GenericExecutor
    )

    caip_executor_spec = executor_spec.ExecutorClassSpec(
        ai_platform_trainer_executor.GenericExecutor
    )

    # Hyperparameter generation.
    hyperparams_gen = custom_components.hyperparameters_gen(
        num_epochs=num_epochs,
        batch_size=batch_size,
        learning_rate=learning_rate,
        hidden_units=hidden_units,
    ).with_id("HyperparamsGen")

    # Get train source query.
    train_sql_query = datasource_utils.get_training_source_query(
        config.PROJECT,
        config.REGION,
        config.DATASET_DISPLAY_NAME,
        ml_use="UNASSIGNED",
        limit=int(config.TRAIN_LIMIT),
    )

    train_output_config = example_gen_pb2.Output(
        split_config=example_gen_pb2.SplitConfig(
            splits=[
                example_gen_pb2.SplitConfig.Split(
                    name="train", hash_buckets=int(config.NUM_TRAIN_SPLITS)
                ),
                example_gen_pb2.SplitConfig.Split(
                    name="eval", hash_buckets=int(config.NUM_EVAL_SPLITS)
                ),
            ]
        )
    )

    # Train example generation.
    train_example_gen = BigQueryExampleGen(
        query=train_sql_query,
        output_config=train_output_config,
    ).with_id("TrainDataGen")

    # Get test source query.
    test_sql_query = datasource_utils.get_training_source_query(
        config.PROJECT,
        config.REGION,
        config.DATASET_DISPLAY_NAME,
        ml_use="TEST",
        limit=int(config.TEST_LIMIT),
    )

    test_output_config = example_gen_pb2.Output(
        split_config=example_gen_pb2.SplitConfig(
            splits=[
                example_gen_pb2.SplitConfig.Split(name="test", hash_buckets=1),
            ]
        )
    )

    # Test example generation.
    test_example_gen = BigQueryExampleGen(
        query=test_sql_query,
        output_config=test_output_config,
    ).with_id("TestDataGen")

    # Schema importer.
    schema_importer = Importer(
        source_uri=RAW_SCHEMA_DIR,
        artifact_type=tfx.types.standard_artifacts.Schema,
    ).with_id("SchemaImporter")

    # Statistics generation.
    #statistics_gen = StatisticsGen(examples=train_example_gen.outputs['examples']).with_id("StatisticsGen")
    statistics_gen = tfx.components.StatisticsGen(examples=train_example_gen.outputs['examples']).with_id("StatisticsGen")

    # Example validation.
    example_validator = ExampleValidator(
        #statistics=statistics_gen.outputs.statistics,
        statistics=statistics_gen.outputs['statistics'],
        schema=schema_importer.outputs['result'],
    ).with_id("ExampleValidator")

    # Data transformation.
    transform = Transform(
        examples=train_example_gen.outputs['examples'],
        schema=schema_importer.outputs['result'],
        module_file=TRANSFORM_MODULE_FILE,
        splits_config=transform_pb2.SplitsConfig(
            analyze=["train"], transform=["train", "eval"]
        ),
    ).with_id("DataTransformer")

    # Add dependency from example_validator to transform.
    transform.add_upstream_node(example_validator)

    # Get the latest model to warmstart
    warmstart_model_resolver = Resolver(
        strategy_class=latest_artifacts_resolver.LatestArtifactsResolver,
        latest_model=tfx.types.Channel(type=tfx.types.standard_artifacts.Model),
    ).with_id("WarmstartModelResolver")

    # Model training.
    trainer = Trainer(
        custom_executor_spec=local_executor_spec
        if config.TRAINING_RUNNER == "local"
        else caip_executor_spec,
        module_file=TRAIN_MODULE_FILE,
        transformed_examples=transform.outputs['transformed_examples'],
        schema=schema_importer.outputs['result'],
        # base_model=warmstart_model_resolver.outputs.latest_model,
        transform_graph=transform.outputs['transform_graph'],
        train_args=trainer_pb2.TrainArgs(num_steps=0),
        eval_args=trainer_pb2.EvalArgs(num_steps=None),
        hyperparameters=hyperparams_gen.outputs['hyperparameters'],
    ).with_id("ModelTrainer")

    # Get the latest blessed model (baseline) for model validation.
    baseline_model_resolver = Resolver(
        strategy_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
        model=tfx.types.Channel(type=tfx.types.standard_artifacts.Model),
        model_blessing=tfx.types.Channel(
            type=tfx.types.standard_artifacts.ModelBlessing
        ),
    ).with_id("BaselineModelResolver")

    # Prepare evaluation config.
    eval_config = tfma.EvalConfig(
        model_specs=[
            tfma.ModelSpec(
                signature_name="serving_tf_example",
                label_key=features.TARGET_FEATURE_NAME,
                prediction_key="probabilities",
            )
        ],
        slicing_specs=[
            tfma.SlicingSpec(),
        ],
        metrics_specs=[
            tfma.MetricsSpec(
                metrics=[
                    tfma.MetricConfig(class_name="ExampleCount"),
                    tfma.MetricConfig(
                        class_name="BinaryAccuracy",
                        threshold=tfma.MetricThreshold(
                            value_threshold=tfma.GenericValueThreshold(
                                lower_bound={"value": float(config.ACCURACY_THRESHOLD)}
                            ),
                            # Change threshold will be ignored if there is no
                            # baseline model resolved from MLMD (first run).
                            change_threshold=tfma.GenericChangeThreshold(
                                direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                                absolute={"value": -1e-10},
                            ),
                        ),
                    ),
                ]
            )
        ],
    )

    # Model evaluation.
    evaluator = Evaluator(
        examples=test_example_gen.outputs['examples'],
        example_splits=["test"],
        model=trainer.outputs['model'],
        # baseline_model=baseline_model_resolver.outputs.model,
        eval_config=eval_config,
        schema=schema_importer.outputs['result'],
    ).with_id("ModelEvaluator")

    exported_model_location = os.path.join(
        config.MODEL_REGISTRY_URI, config.MODEL_DISPLAY_NAME
    )
    push_destination = tfx.proto.pusher_pb2.PushDestination(
        filesystem=tfx.proto.pusher_pb2.PushDestination.Filesystem(
            base_directory=exported_model_location
        )
    )

    # Push custom model to model registry.
    pusher = Pusher(
        model=trainer.outputs['model'],
        model_blessing=evaluator.outputs['blessing'],
        push_destination=push_destination,
    ).with_id("ModelPusher")

    # Upload custom trained model to Vertex AI.
    explanation_config = json.dumps(features.generate_explanation_config())
    vertex_model_uploader = custom_components.vertex_model_uploader(
        project=config.PROJECT,
        region=config.REGION,
        model_display_name=config.MODEL_DISPLAY_NAME,
        pushed_model_location=exported_model_location,
        serving_image_uri=config.SERVING_IMAGE_URI,
        explanation_config=explanation_config,
    ).with_id("VertexUploader")

    pipeline_components = [
        hyperparams_gen,
        train_example_gen,
        test_example_gen,
        statistics_gen,
        schema_importer,
        example_validator,
        transform,
        # warmstart_model_resolver,
        trainer,
        # baseline_model_resolver,
        evaluator,
        pusher,
    ]

    if int(config.UPLOAD_MODEL):
        pipeline_components.append(vertex_model_uploader)
        # Add dependency from pusher to aip_model_uploader.
        vertex_model_uploader.add_upstream_node(pusher)

    logging.info(
        f"Pipeline components: {[component.id for component in pipeline_components]}"
    )

    beam_pipeline_args = config.BEAM_DIRECT_PIPELINE_ARGS
    if config.BEAM_RUNNER == "DataflowRunner":
        beam_pipeline_args = config.BEAM_DATAFLOW_PIPELINE_ARGS

    logging.info(f"Beam pipeline args: {beam_pipeline_args}")

    return pipeline.Pipeline(
        pipeline_name=config.PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=pipeline_components,
        beam_pipeline_args=beam_pipeline_args,
        metadata_connection_config=metadata_connection_config,
        enable_cache=int(config.ENABLE_CACHE),
    )


In [77]:
import os
from kfp.v2.google.client import AIPlatformClient
from tfx.orchestration import data_types
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner


#from src.tfx_pipelines import config, training_pipeline, prediction_pipeline
#from src.model_training import defaults


def compile_training_pipeline1(pipeline_definition_file):

    pipeline_root = os.path.join(
        config.ARTIFACT_STORE_URI,
        config.PIPELINE_NAME,
    )

    managed_pipeline = training_pipeline.create_pipeline(
        pipeline_root=pipeline_root,
        num_epochs=data_types.RuntimeParameter(
            name="num_epochs",
            default=defaults.NUM_EPOCHS,
            ptype=int,
        ),
        batch_size=data_types.RuntimeParameter(
            name="batch_size",
            default=defaults.BATCH_SIZE,
            ptype=int,
        ),
        learning_rate=data_types.RuntimeParameter(
            name="learning_rate",
            default=defaults.LEARNING_RATE,
            ptype=float,
        ),
        hidden_units=data_types.RuntimeParameter(
            name="hidden_units",
            default=",".join(str(u) for u in defaults.HIDDEN_UNITS),
            ptype=str,
        ),
    )

    runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
        config=kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
            default_image=config.TFX_IMAGE_URI
        ),
        output_filename=pipeline_definition_file,
    )

    return runner.run(managed_pipeline, write_out=True)


def compile_prediction_pipeline(pipeline_definition_file):

    pipeline_root = os.path.join(
        config.ARTIFACT_STORE_URI,
        config.PIPELINE_NAME,
    )

    managed_pipeline = prediction_pipeline.create_pipeline(
        pipeline_root=pipeline_root,
    )

    runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
        config=kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
            default_image=config.TFX_IMAGE_URI
        ),
        output_filename=pipeline_definition_file,
    )

    return runner.run(managed_pipeline, write_out=True)


def submit_pipeline(pipeline_definition_file):

    pipeline_client = AIPlatformClient(project_id=config.PROJECT, region=config.REGION)
    pipeline_client.create_run_from_job_spec(pipeline_definition_file)


In [78]:
#from src.tfx_pipelines import runner

pipeline_definition_file = f'{config.PIPELINE_NAME}.json'
print(pipeline_definition_file)
#pipeline_definition = compile_training_pipeline(pipeline_definition_file)

chicago-taxi-tips-classifier-train-pipeline.json


In [79]:
def compile_training_pipeline12(pipeline_definition_file):
    print("ravi")
    pipeline_root = os.path.join(
        config.ARTIFACT_STORE_URI,
        config.PIPELINE_NAME,
    )

    managed_pipeline = create_pipeline(
        pipeline_root=pipeline_root,
        num_epochs=data_types.RuntimeParameter(
            name="num_epochs",
            default=defaults.NUM_EPOCHS,
            ptype=int,
        ),
        batch_size=data_types.RuntimeParameter(
            name="batch_size",
            default=defaults.BATCH_SIZE,
            ptype=int,
        ),
        learning_rate=data_types.RuntimeParameter(
            name="learning_rate",
            default=defaults.LEARNING_RATE,
            ptype=float,
        ),
        hidden_units=data_types.RuntimeParameter(
            name="hidden_units",
            default=",".join(str(u) for u in defaults.HIDDEN_UNITS),
            ptype=str,
        ),
    )

    runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
        config=kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
            default_image=config.TFX_IMAGE_URI
        ),
        output_filename=pipeline_definition_file,
    )

    return runner.run(managed_pipeline, write_out=True)


In [98]:
pipeline_definition=compile_training_pipeline12(pipeline_definition_file)

ravi




In [99]:
pipeline_definition['pipelineSpec']['deploymentSpec']['executors']['ModelPusher_executor']['container']['image']='gcr.io/aiops-industrialization/chicago-taxi-tips:latest'

In [100]:
pipeline_definition['pipelineSpec']['deploymentSpec']['executors']['ModelPusher_executor']['container']['image']

'gcr.io/aiops-industrialization/chicago-taxi-tips:latest'

In [104]:
pipeline_definition_file

'chicago-taxi-tips-classifier-train-pipeline.json'

In [101]:
#PIPELINES_STORE = f"gs://{BUCKET}/{DATASET_DISPLAY_NAME}/compiled_pipelines/"
PIPELINES_STORE='gs://aiops-industrialization-bucket-ravi/chicago-taxi-tips/compiled_pipelines/'
!gsutil cp {pipeline_definition_file} {PIPELINES_STORE}

Copying file://chicago-taxi-tips-classifier-train-pipeline.json [Content-Type=application/json]...
/ [1 files][ 25.2 KiB/ 25.2 KiB]                                                
Operation completed over 1 objects/25.2 KiB.                                     


In [102]:
PROJECT = 'aiops-industrialization' # Change to your project id.
REGION = 'us-central1' # Change to your region.
BUCKET = 'aiops-industrialization-bucket-ravi'  # Change to your bucket name.
SERVICE_ACCOUNT = "175728527123-compute@developer.gserviceaccount.com"

In [106]:
from kfp.v2.google.client import AIPlatformClient

pipeline_client = AIPlatformClient(
    project_id=PROJECT, region=REGION)
                 
job = pipeline_client.create_run_from_job_spec(
    job_spec_path=pipeline_definition_file,
    parameter_values={
        'learning_rate': 0.003,
        'batch_size': 512,
        'hidden_units': '128,128',
        'num_epochs': 30,
    }
)