## Set Variables

In [193]:
PROJECT_ID='jchavezar-demo'
REGION='us-central1'
DATASET_NAME='mpg-dataset'
DATASET_GCS_SOURCE='gs://jchavezar-public-datasets/auto-mpg.csv'
PIPELINE_ROOT_PATH='gs://vtx-root-path/'
MODEL_DIR='gs://vtx-models/custom-pipe/'
MODEL_NAME='mpg-pipe'
ENDPOINT_NAME='mpg-endp-pip'

In [123]:
!gsutil ls

gs://artifacts.jchavezar-demo.appspot.com/
gs://cloud-ai-platform-9b627e36-89f4-4b8c-bf59-1e00936392b5/
gs://jchavezar-demo_cloudbuild/
gs://vtx-artifacts/
gs://vtx-cpr/
gs://vtx-datasets-public/
gs://vtx-models/
gs://vtx-packages/
gs://vtx-pipelines/
gs://vtx-root-path/


## Create Component

In [170]:
from kfp.v2.dsl import (component, Input, Artifact)

@component(
    packages_to_install=[
        'google-cloud-aiplatform',
        'pandas',
        'gcsfs',
        'tensorflow'
    ])
def train(dataset: Input[Artifact], model_dir: str) -> str:
    from google.cloud import aiplatform
    import pandas as pd
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    import os
    import warnings
    warnings.filterwarnings('ignore')
    
    dataset = aiplatform.TabularDataset('projects/' + dataset.uri.split('projects/')[-1])
    gcs_resource = dataset.gca_resource.metadata['inputConfig']['gcsSource']['uri'][0]
    dataset = pd.read_csv(gcs_resource)
    
    dataset = dataset.dropna()
    dataset['Origin'] = dataset['Origin'].map({1: 'USA', 2: 'Europe', 3: 'Japan'})
    dataset = pd.get_dummies(dataset, prefix='', prefix_sep='')

    train_dataset = dataset.sample(frac=0.8, random_state=0)
    test_dataset = dataset.drop(train_dataset.index)

    train_stats = train_dataset.describe()
    train_stats.pop("MPG")
    train_stats = train_stats.transpose()
    
    train_labels = train_dataset.pop('MPG')
    test_labels = test_dataset.pop('MPG')
    
    #Normalization
    
    def norm(x):
        return (x - train_stats['mean']) / train_stats['std']
    
    normed_train_data = norm(train_dataset)
    normed_test_data = norm(test_dataset)
    
    def build_model():
        model_ai = keras.Sequential([
            layers.Dense(64, activation='relu', input_shape=[len(train_dataset.keys())]),
            layers.Dense(64, activation='relu'),
            layers.Dense(1)
        ])
        optimizer = tf.keras.optimizers.RMSprop(0.001)
        model_ai.compile(loss='mse',
                     optimizer=optimizer,
                     metrics=['mae', 'mse'])
        return model_ai
    
    model = build_model()
    model.summary()
    EPOCHS = 1000
    
    # The patience parameter is the amount of epochs to check for improvement
    early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)
    early_history = model.fit(normed_train_data, train_labels,
                              epochs=EPOCHS, validation_split=0.2,
                              callbacks=[early_stop])
    # Export model and save to GCS
    print(model_dir)
    
    output_directory = model_dir
    if os.environ.get('AIP_MODEL_DIR') is not None:
        output_directory = os.environ["AIP_MODEL_DIR"]    
    
    print(output_directory)
    
    model.save(output_directory)

## Create a Pipeline

In [194]:
from kfp.v2.dsl import pipeline
from google_cloud_pipeline_components import aiplatform as gcc
import warnings
warnings.filterwarnings('ignore')

@pipeline(name='mpg-pipe')
def pipeline(
    display_name_ds: str,
    gcs_source: str,
    project: str,
    model_dir: str,
    model_display_name: str,
    region: str,
    endpoint_name: str
):
    create_dataset_task = gcc.TabularDatasetCreateOp(
        display_name=display_name_ds,
        gcs_source=gcs_source,
        project=project,
    )
    custom_train_task = train(
        create_dataset_task.output,
        model_dir = model_dir,
    )
    upload_model_task = gcc.ModelUploadOp(
        display_name=model_display_name,
        project=project,
        location=region,
        serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest',
        artifact_uri=model_dir
    )
    upload_model_task.after(custom_train_task)
    create_endpoint_task = gcc.EndpointCreateOp(
        display_name=endpoint_name,
        project=project,
    )
    deploy_model_task = gcc.ModelDeployOp(
        endpoint = create_endpoint_task.outputs['endpoint'],
        model = upload_model_task.outputs['model'],
        traffic_split={'0': 100},
        dedicated_resources_machine_type='n1-standard-2',
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1
    )
    

## Compile Pipeline

In [195]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='mpg_pipe.json')

## Run Pipeline Job

In [197]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="mpg-pipe-job",
    template_path="mpg_pipe.json",
    pipeline_root=PIPELINE_ROOT_PATH,
    parameter_values={
        'display_name_ds': DATASET_NAME,
        'gcs_source': DATASET_GCS_SOURCE,
        'project': PROJECT_ID,
        'model_dir': MODEL_DIR,
        'model_display_name': MODEL_NAME,
        'region': REGION,
        'endpoint_name': ENDPOINT_NAME
    }
)

job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/569083142710/locations/us-central1/pipelineJobs/mpg-pipe-20220613205603
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/569083142710/locations/us-central1/pipelineJobs/mpg-pipe-20220613205603')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/mpg-pipe-20220613205603?project=569083142710
