
## Tarea 1 Vertex - Michael H.

# Dependecias

In [None]:
!pip install kfp google-cloud-aiplatform==1.18.1 tensorflow

In [None]:
! pip install -U google-cloud-aiplatform "shapely<2"

# Importar y variables de entorno

In [None]:
import tensorflow as tf
from tensorflow import keras

In [None]:
import os
from datetime import datetime
import os

# VARIABLES DE GCP
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] =  ''
PIPELINE_ID = os.getenv("PIPELINE_CONFIG_ID")
PIPELINE_DISPLAY_NAME = os.getenv("PIPELINE_DISPLAY_NAME", "mypipeline")
PIPELINE_COMPILE_FILE = os.getenv("PIPELINE_COMPILE_FILE", './pipeline_compile.json') 
PIPELINE_SERVICE_ACCOUNT = os.getenv("PIPELINE_SERVICE_ACCOUNT", "") 
PIPELINE_PROJECT_ID = os.getenv("PIPELINE_PROJECT_ID", "") 
PIPELINE_REGION = os.getenv("PIPELINE_REGION", "") 
PIPELINE_BUCKET = os.getenv("PIPELINE_BUCKET", "")
PIPELINE_PATH_ROOT = os.getenv("PIPELINE_PATH_ROOT", "locale_root")
SERVICE_ACCOUNT = ''


PROJECT_ID = PIPELINE_PROJECT_ID
BUCKET_NAME = f"gs://{PIPELINE_BUCKET}" 
REGION = PIPELINE_REGION
PIPELINE_ROOT = os.path.join(BUCKET_NAME, PIPELINE_PATH_ROOT)


# VARIABLES PARA PIPELINE VERTEX
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'pipeline-houseprice-job{}'.format(TIMESTAMP)
PIPELINE_PARAMS = {"project_id": PROJECT_ID,
                   "dataset_location": REGION,
                }
TEMPLATE_PATH = "housing_pipeline.json"
PIPELINE_NAME = "housing_price"
JOBID = f"training-pipeline-{TIMESTAMP}"
ENABLE_CACHING = False


#VARIABLES DE DEPENDECIAS
IMAGE_NAME = "training"
BASE_IMAGE = "gcr.io/cloud-aiplatform/prediction/sklearn-cpu.0-23:latest"
PANDAS = "pandas==1.3.2"
SKLEARN = "scikit-learn==1.0.2"
NUMPY = "numpy==1.21.6"
TENSORFLOW = "tensorflow==2.16.1"

In [None]:
import google.cloud.aiplatform as aiplatform

from kfp.v2 import dsl, compiler

from kfp.v2.dsl import (component, Input, Model, Output, Dataset)

# COMPONENTES - VERTEX IA

# LOAD DATA

 El primer componente obtiene datos de Cloud Storage para generar un DataFrame de pandas.

In [None]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="get_data.yaml"
    , packages_to_install=[PANDAS]
)

def get_houseprice_data(
    filepath: str,
    dataset: Output[Dataset]
):

    import pandas as pd
    df = pd.read_csv(filepath + "/BostonHousing.csv")

# SPLIT DATA AND TRAIN MODEL

Este componente realiza el split de la data para el entrenamiento y test para posteriormente hacer la implementación de la red neuronal

In [None]:
@component(base_image=BASE_IMAGE, packages_to_install=[TENSORFLOW],
            output_component_file="model_training.yaml")
def train_houseprice(
    dataset_in: Input[Dataset],
    model: Output[Model]
):
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense


    def last_model():
        # create model
        model = Sequential()
        model.add(Dense(20, input_shape=(13,), kernel_initializer='normal', activation='relu'))
        model.add(Dense(13, kernel_initializer='normal', activation='relu'))
        model.add(Dense(6, kernel_initializer='normal', activation='relu'))
        model.add(Dense(1, kernel_initializer='normal'))
        # Compile model
        model.compile(loss='mean_squared_error', optimizer='adam', metrics=['accuracy'])
        return model

    X = dataset_in[:,0:13]
    Y = dataset_in[:,13]


    model = last_model()

    X_train = X[:450]
    Y_train = Y[:450]
    X_val = X[451:500]
    Y_val = Y[451:500]

    model.fit(X_train, Y_train, epochs=100, batch_size=5,
                     validation_data=(X_val, Y_val), verbose=1)

    model.save('housing.h5')

# UPLOAD MODEL

Se encarga de subir el modelo de red neuronal a Vertex AI

In [None]:
@component(
    base_image=BASE_IMAGE,
    install_kfp_package=False,
    output_component_file="model.yaml",
)
def upload_houseprice(
        serving_container_image_uri: str,
        display_name: str,
        gcp_project: str,
        gcp_region: str,
        model: Input[Model],
        uploaded_model:Output[Model],
        vertex_model: Output[Model]
):
    from google.cloud import aiplatform as vertex_ai
    from pathlib import Path


    listed_model = vertex_ai.Model.list(
        filter='display_name="{}"'.format(display_name),
        project=gcp_project,
        location=gcp_region)

    listed_model = vertex_ai.Model.list(
        filter='display_name="{}"'.format(display_name),
        project=gcp_project,
        location=gcp_region,
        )

    if len(listed_model) > 0:
        model_version = listed_model[0] # most recently created
        model_upload = vertex_ai.Model.upload(
            display_name=display_name,
            parent_model=model_version.resource_name,
            artifact_uri=str(Path(model.path).parent),
            serving_container_image_uri=serving_container_image_uri,
            location=gcp_region,
            serving_container_predict_route="/predict",
            serving_container_health_route="/health"
        )
    else:
        model_upload = vertex_ai.Model.upload(
            display_name=display_name,
            artifact_uri=str(Path(model.path).parent),
            serving_container_image_uri=serving_container_image_uri,
            location=gcp_region,
            serving_container_predict_route="/predict",
            serving_container_health_route="/health"
        )

    uploaded_model = model_upload
     # Save data to the output params
    vertex_model.uri = model_upload.resource_name


# ENDPOINT CREATE

Despliega el modelo, generado una url para el endpoints para ser consumido.

In [None]:
@component(
    base_image=BASE_IMAGE,
    install_kfp_package=False,
    output_component_file="model_deployment.yaml",
)
def deploy_houseprice(
        display_name: str,
        model_endpoint: str,
        gcp_project: str,
        gcp_region: str,
        vertex_model: Input[Model],
        uploaded_model: Input[Model],
        vertex_endpoint: Output[Model]
):
    from google.cloud import aiplatform as vertex_ai
    from pathlib import Path

    endpoints = vertex_ai.Endpoint.list(
        filter='display_name="{}"'.format(model_endpoint),
        order_by='create_time desc',
        project=gcp_project,
        location=gcp_region,
        )
    if len(endpoints) > 0:
        endpoint = endpoints[0] # most recently created
    else:
        endpoint = vertex_ai.Endpoint.create(
            display_name=model_endpoint,
            project=gcp_project,
            location=gcp_region
    )



    # Deploys trained model to Vertex AI Endpoint
    model_deploy = uploaded_model.deploy(
        machine_type='n1-standard-2',
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=display_name,
    )

    # Save data to the output params
    vertex_endpoint.uri = model_deploy.resource_name




## Crear Pipeline

Finalmente se crea el pipeline que contiene los componentes ya mencionados

In [None]:
@dsl.pipeline(
    name="pipeline-houseprice"   
)


def pipeline(
    data_filepath: str = f"{BUCKET_NAME}/data",
    project: str = PROJECT_ID,
    region: str = REGION,
    display_name: str = DISPLAY_NAME,
    serving_container_image_uri: str = BASE_IMAGE
):

    data_op = get_houseprice_data(data_filepath)
    data_preprocess_op = train_houseprice(data_op.outputs["dataset"])

    upload_model_op = upload_houseprice(
        model = data_preprocess_op.outputs['model'],
        gcp_project = PROJECT_ID,
        gcp_region = REGION,
        serving_container_image_uri = BASE_IMAGE,
        display_name = "houseprice",
    )


    deploy_model_op = deploy_houseprice(
        uploaded_model=upload_model_op.outputs['uploaded_model'],
        vertex_model= upload_model_op.outputs['vertex_model'],
        gcp_project = PROJECT_ID,
        gcp_region = REGION,
        display_name = "houseprice",
        model_endpoint = "houseprice_endpoint"
    )



## Compilar y correr Pipeline

A continuación se realiza la ejecución del pipeline en GCP

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=TEMPLATE_PATH
)

In [None]:
aiplatform.init(project=PROJECT_ID, location=REGION)

In [None]:
# RUN THE PIPELINE

pipeline_ = aiplatform.pipeline_jobs.PipelineJob(
    enable_caching=ENABLE_CACHING,
    display_name=PIPELINE_NAME,
    template_path=TEMPLATE_PATH,
    job_id=JOBID)

pipeline_.submit(SERVICE_ACCOUNT)