In [1]:
import json
from typing import NamedTuple


from kfp import dsl
from kfp.v2 import compiler
from typing import NamedTuple
from kfp.v2 import dsl
from kfp.v2.dsl import (
    component,
    InputPath,
    OutputPath,
    Input,
    Output,
    Artifact,
    Dataset,
    Model,
    ClassificationMetrics,
    Metrics,
)

from kfp.v2.google.client import AIPlatformClient
from google_cloud_pipeline_components import aiplatform as gcc_aip

In [2]:
from view_demo.utils import env_vars as evar
PROJECT_ID = evar.PROJECT_ID
DATASET_CSV = evar.DATASET_CSV
REGION = evar.REGION
BASE_IMAGE_URI = evar.BASE_IMAGE_URI
BASE_TRAINING_IMAGE = evar.BASE_TRAINING_IMAGE
SRC_ROOT = evar.SRC_ROOT
PIPELINE_ROOT = evar.PIPELINE_ROOT
STAGING_BUCKET = evar.STAGING_BUCKET
TENSORBOARD_INST = evar.TENSORBOARD_INST
TF_SERVING_IMAGE = evar.TF_SERVING_IMAGE

In [4]:
@component(
    base_image=BASE_IMAGE_URI,
    output_component_file=f'{SRC_ROOT}/preprocess/preprocess.yaml',
)
def view_preprocess(
    project_id: str,
    raw_dataset: str,
    out_dataset: OutputPath(),
):
    from view_demo.preprocess import create_dataset
    bq_path = create_dataset(project_id=project_id, csv_path=raw_dataset)
    with open(out_dataset, 'w') as f:
        f.write(bq_path)
        

In [16]:
# val_mae: OutputPath(float),
@component(
    base_image=BASE_IMAGE_URI,
    output_component_file=f'{SRC_ROOT}/train/train.yaml',
)
def view_train(
    project_id: str,
    input_dataset_path: InputPath(),
    metrics: Output[Metrics],
    model: Output[Model],
    experiment_prefix: str ,
    staging_bucket: str ,
    context_window: int = 24
  
) -> float :
    print(locals())
    from google.cloud import aiplatform
    from datetime import datetime
    import logging, os
    
    with open(input_dataset_path) as f:
        logging.info(f"input_dataset is: {f.read()}")
    # Create and experiment tag
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    experiment_id = experiment_prefix + TIMESTAMP
    run_id = f'context-window-{context_window}'
    # Init AI Platform
    aiplatform.init(
        project=project_id,
        staging_bucket=staging_bucket,
        experiment=experiment_id
    )

    # Define the custom training job
    job = aiplatform.CustomContainerTrainingJob(
        display_name="view-training",
        container_uri='gcr.io/pytorch-tpu-nfs/test-custom-trainer:latest',
        model_serving_container_image_uri="gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest",
    )
    logging.info(f"Type of experiment_id :{type(experiment_id)}")
    logging.info(f"Type of staging_bucket :{type(staging_bucket)}")
    logging.info(f"Type of context_window :{type(context_window)}")
    model_obj = job.run(
        replica_count=1, 
        model_display_name="temp-prediction",
        args=[
            f'--experiment-id={experiment_id}', 
            f'--staging-bucket={staging_bucket}',
            f'--context-window={context_window}'
        ],
        environment_variables={'AIP_MODEL_DIR': model.uri},
        base_output_dir=os.path.dirname(model.uri)
    )
    
    metrics_df = aiplatform.get_experiment_df(experiment_id)
    val_mae = metrics_df.loc[metrics_df['run_name'] == run_id]['metric.val_mae'].values[-1]
    val_mae = float(val_mae)
    metrics.log_metric('val_mae', val_mae)
    logging.info(f"Mean Error is:{val_mae}")
    return val_mae
        
    

In [17]:
@component(
    base_image=BASE_IMAGE_URI,
    output_component_file=f'{SRC_ROOT}/tests/fail_op.yaml',
)
def fail_op (message: str = "Metric is below threshhold"):
    raise ValueError(message)


In [18]:
@component(
    base_image=BASE_IMAGE_URI,
    output_component_file=f'{SRC_ROOT}/change-type.yaml',
)
def get_model_uri(model: Input[Model] ) -> str:
    return model.uri


In [19]:
@dsl.pipeline(
    name="view-test-pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def view_pipeline(
    project_id: str = PROJECT_ID,
    raw_dataset: str = DATASET_CSV,
    staging_bucket: str = STAGING_BUCKET,
    mae_cutoff: float = 5.0,
    model_display_name: str = 'forecast-custom',
    context_window: int = 24,
    experiment_prefix: str = 'weather-prediction-'
):
    preprocess_task = view_preprocess(
        project_id=project_id,
        raw_dataset=raw_dataset
    )
    train_task = view_train(
        project_id=project_id,
        input_dataset_path=preprocess_task.outputs["out_dataset"],
        context_window=context_window,
        experiment_prefix=experiment_prefix,
        staging_bucket=staging_bucket
    )
    #train_task.execution_options.caching_strategy.max_cache_staleness = "P0D"

    with dsl.Condition(train_task.outputs['output'] < mae_cutoff , name="mae_test"):
        get_model_task = get_model_uri(train_task.outputs['model'])
        model_upload_op = gcc_aip.ModelUploadOp(
            project=project_id,
            display_name=model_display_name,
            artifact_uri=get_model_task.outputs['output'],
            serving_container_image_uri="gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest",
            serving_container_environment_variables={"NOT_USED": "NO_VALUE"},
        )
        model_upload_op.after(train_task)
        endpoint_create_op = gcc_aip.EndpointCreateOp(
            project=project_id,
            display_name="pipelines-created-endpoint",
        )
        model_deploy_op = gcc_aip.ModelDeployOp(  # noqa: F841
            project=project_id,
            endpoint=endpoint_create_op.outputs["endpoint"],
            model=model_upload_op.outputs["model"],
            deployed_model_display_name=model_display_name,
            machine_type="n1-standard-4",
        )
    with dsl.Condition(train_task.outputs['output'] > mae_cutoff , name="Low_Quality"):
        fail_task = fail_op()


In [20]:
from kfp.v2 import compiler as v2compiler
v2compiler.Compiler().compile(pipeline_func=view_pipeline,
                              package_path='view_pipeline_spec.json')

In [21]:
from kfp.v2.google.client import AIPlatformClient  # noqa: F811

api_client = AIPlatformClient(
    project_id=PROJECT_ID, 
    region=REGION, 
    )



In [22]:
result = api_client.create_run_from_job_spec(
    job_spec_path="view_pipeline_spec.json",
    pipeline_root=PIPELINE_ROOT,
    enable_caching=True,
    parameter_values={
    },
)

In [None]:
from view_demo.utils.check_pipeline_status import check_pipeline_status
check_pipeline_status(api_client, result)