In [10]:
from datetime import datetime
import os
import google.cloud.aiplatform as aip
import json
from string import Template

import google.auth
from google.auth import impersonated_credentials, transport

from typing import NamedTuple

import kfp
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.v1 import bigquery as gcc_bq

from kfp.v2 import dsl, compiler
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output, component, Dataset)
from google_cloud_pipeline_components.experimental.custom_job import utils

from kfp.v2.components import importer_node
from google_cloud_pipeline_components.types import artifact_types

from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp

In [11]:
@component
def error_op(msg: str):
    raise(msg)

In [19]:
@component(
    packages_to_install=[
        "google-cloud-bigquery",
        "google-cloud-bigquery-storage",
        "pandas",
        "scikit-learn",
        "joblib",
        "db-dtypes",
        "pyarrow",
        "pandas-gbq",
        "google-cloud-storage",
        "pytz"
    ],
)
def prediction(
    project: str,
    source_x_train_table: str,
    features_table: str,
    table_id: str,
    path_model: str,
):  
    from datetime import datetime
    import pandas as pd
    from google.cloud import bigquery
    import pandas_gbq
    from google.cloud import storage
    from joblib import load
    from io import BytesIO
    from pytz import timezone
    
    tz = timezone('America/Lima')
    FORMAT_DATE = "%Y-%m-%d"
    
    # Cliente BigQuery
    client = bigquery.Client(project=project)
    
    # Leer datos de BigQuery
    X_train = client.query(
    '''SELECT * FROM `{dsource_table}`
        '''.format(dsource_table=source_x_train_table)).to_dataframe()

    
    # Leer características seleccionadas de BigQuery
    features = client.query(
    '''SELECT * FROM `{dsource_table}`
        '''.format(dsource_table=features_table)).to_dataframe()
    
    features = features["string_field_0"].tolist()
    

    X_train = X_train[features]
    
    def generate_datetime_created():
        return datetime.now()
    
    def generate_date_created():
        return datetime.now(tz).date().strftime(FORMAT_DATE)
    
    
    def load_model_from_gcs(path_model):
        # Inicializar el cliente de Cloud Storage
        storage_client = storage.Client()

        # Obtener el nombre del bucket y la ruta del objeto
        bucket_name, blob_name = path_model.replace("gs://", "").split("/", 1)

        # Obtener el objeto desde Cloud Storage
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        model_bytes = blob.download_as_string()

        # Cargar el modelo desde los bytes obtenidos
        classifier = load(BytesIO(model_bytes))

        return classifier

    classifier = load_model_from_gcs(path_model)

    # Realizar la predicción
    predictions = classifier.predict(X_train)
    predictions = pd.DataFrame(predictions, columns=['prediction'])

    # Obtener el user_id de la sesión actual en BigQuery
    user_id = client.query("SELECT SESSION_USER()").to_dataframe().iloc[0, 0]

    # Agregar campos de auditoría
    start_time = generate_datetime_created()
    execute_date = generate_date_created()
    
    predictions['creation_user'] = user_id
    predictions['process_date'] = datetime.strptime(execute_date, '%Y-%m-%d')
    predictions['process_date'] = pd.to_datetime(predictions['process_date']).dt.date
    predictions['load_date'] = pd.to_datetime(start_time)
    
    # Guardar el resultado en BigQuery 
    pandas_gbq.to_gbq(predictions , table_id, if_exists='append', project_id=project)

    print("Predicción generada y guardada en BigQuery.")

In [20]:
@kfp.dsl.pipeline(
    name="intro", 
    description="intro",
    pipeline_root="gs://<bucket>/demo"
)

def main_pipeline(
    project: str,
    source_x_train_table: str,
    features_table: str,
    table_id: str,
    path_model: str,
    gcp_region: str = "us-central1",
):
    
    notify_email_task = VertexNotificationEmailOp(recipients=["laybarm@intercorp.com.pe"])
    notify_email_task.set_display_name('Notification Email')
    
    with dsl.ExitHandler(notify_email_task, name="Execute pipeline prediction"):

        validate_tables_job = validate_data(
            source_x_train_table = source_x_train_table
        )
        validate_tables_job.set_display_name('Validate Data')

        with dsl.Condition(
            validate_tables_job.outputs['condition']=="false",
            name="no-execute",
        ):
            error_op("No se logro validar las tablas de ingesta.")


        with dsl.Condition(
            validate_tables_job.outputs['condition']=="true",
            name="execute",
        ):
  
            prediction_tast = prediction(
                project = project,
                source_x_train_table = source_x_train_table,
                features_table = features_table,
                table_id = table_id,
                path_model = path_model,
            )
            prediction_tast.set_display_name("PREDICTION_MODEL")

In [None]:
from kfp.v2 import compiler as v2_compiler

v2_compiler.Compiler().compile(
    pipeline_func=main_pipeline,
    package_path="pipeline_prediction.json"
)

In [None]:
from google.cloud import storage

def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"Archivo {source_file_name} subido a {destination_blob_name} en el bucket {bucket_name}.")

# Define las variables
bucket_name = "<bucket>"
destination_blob_name = "demo/pipeline_prediction.json"
pipeline_file = "pipeline_prediction.json"
# Llamar a la función para subir el archivo
upload_to_gcs(bucket_name, pipeline_file, destination_blob_name)

In [23]:
from google.cloud import aiplatform
aiplatform.init(project="<project_id>", location="us-central1")

In [None]:
from google.cloud import aiplatform

job = aiplatform.PipelineJob(
    display_name="pipeline de prueba",
    template_path="gs://<bucket>/demo/pipeline_prediction.json",
    #job_id="mlops72",
    enable_caching=False,
    project="<project_id>",
    location="us-central1",
    parameter_values={"project": "<project_id>", 
                      "source_x_train_table": "<project_id>.<dataset>.xtrain",
                      "features_table": "<project_id>.<dataset>.selected_features",
                      "table_id": "<project_id>.<dataset>.predictions",
                      "path_model": "gs://<bucket>/demo/data/model/model.joblib"
                     },
    failure_policy = 'slow'
    #labels={"module": "ml", "application": "app", "chapter": "mlops", "company": "datapat", "environment": "dev", "owner": "xxxx"}
)

print('submit pipeline job ...')
job.submit(service_account="dev-dp-ml-vertex@<project_id>.iam.gserviceaccount.com")