In [23]:
import os
import kfp
from google.cloud import aiplatform
from kfp.v2 import dsl, compiler
from kfp.v2.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output, component,Dataset)
from google.cloud import storage
from typing import NamedTuple
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp

In [24]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='user-demo.json'

In [25]:
@component
def error_op(msg: str):
    raise(msg)#raise con el error levantado

In [26]:
@component(packages_to_install=['google-cloud-bigquery'])
def validate_data(#componente que valida la tabla de inicio, este se usara para xi handler en caso que no se cumpla la condicion
    file_path: str,
)-> NamedTuple(
    "Outputs",
    [
        ("condition", str)]
):
    from google.cloud import storage
    storage_client = storage.Client()

    bucket_name, blob_name = file_path.replace("gs://", "").split("/", 1)

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)

    try:
        blob.reload() 
        condition = "true" 
    except Exception as e:
        condition = "false" 
    
    return (condition,)

In [27]:
@component(
    packages_to_install=[
        "google-cloud-bigquery",
        "google-cloud-bigquery-storage",
        "pandas",
        "scikit-learn",
        "joblib",
        "db-dtypes",
        "pyarrow",
        "pandas-gbq",
        "google-cloud-storage",
        "pytz"
    ],
)
def clean_and_input_data(
    project: str,
    table_id: str,
    path_csv: str,
    path_json: str,
):  
    import sys
    import json
    from datetime import datetime
    import pandas as pd
    from google.cloud import bigquery
    from google.auth import default
    import pandas_gbq
    from google.cloud import storage
    from joblib import load
    from io import BytesIO
    from pytz import timezone
    
    
    client = bigquery.Client(project=project)
    
    def load_from_gcs(path):
        storage_client = storage.Client()

        bucket_name, blob_name = path.replace("gs://", "").split("/", 1)

        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        file_bytes = blob.download_as_string()

        return file_bytes
    
    data = pd.read_csv(BytesIO(load_from_gcs(path_csv)))
    schema=json.loads(load_from_gcs(path_json))
    
    data=data.dropna()

    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField(name=field['name'], field_type=field['type'], mode=field['mode'])
            for field in schema['schema']
        ],
        write_disposition=schema['writeDisposition'],  # Aquí se usa correctamente
        source_format=bigquery.SourceFormat.CSV
    )

    load_job = client.load_table_from_dataframe(data, table_id, job_config=job_config)

    load_job.result()

    print("Información guardada en BigQuery.")

In [33]:
@kfp.dsl.pipeline(
    name="pipelineinsertbq", 
    description="",
    pipeline_root="gs://vertex-datapath/demo"
)

def main_pipeline(
    project: str,
    table_id: str,
    path_csv: str,
    path_json: str,
    gcp_region: str = "us-central1",
):
    
    notify_email_task = VertexNotificationEmailOp(recipients=["secabezon21@gmail.com"])
    notify_email_task.set_display_name('Notification Email')
    
    with dsl.ExitHandler(notify_email_task, name="Execute pipeline clean and insert"):

        validate_tables_job = validate_data(
            file_path = path_csv
        )
        validate_tables_job.set_display_name('Validate Data')

        with dsl.Condition(
            validate_tables_job.outputs['condition']=="false",
            name="no-execute",
        ):
            error_op("No se logro validar las tablas de ingesta.")


        with dsl.Condition(
            validate_tables_job.outputs['condition']=="true",
            name="execute",
        ):
  
            clean_data=clean_and_input_data(
                project= project,
                table_id= table_id,
                path_csv= path_csv,
                path_json= path_json,
            )
            clean_data.set_display_name("clean_input_data")

In [34]:
compiler.Compiler().compile(
    pipeline_func=main_pipeline,
    package_path="clean_input_data.json"#debe estar en gcs
)

In [35]:
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()#cliente  gcs
    bucket = storage_client.bucket(bucket_name)#defino el bucke redfenrecia
    blob = bucket.blob(destination_blob_name)#destino del archivo a subir
    blob.upload_from_filename(source_file_name)#funcion que sube el archivo a la ruta definida con el archivo fuente que se desea guardar.
    print(f"Archivo {source_file_name} subido a {destination_blob_name} en el bucket {bucket_name}.")

# Define las variables
bucket_name = "vertex-datapath"
destination_blob_name = "demo/modelo/clean_input_data.json"
pipeline_file = "clean_input_data.json"
# Llamar a la función para subir el archivo
upload_to_gcs(bucket_name, pipeline_file, destination_blob_name)

Archivo clean_input_data.json subido a demo/modelo/clean_input_data.json en el bucket vertex-datapath.


In [36]:
aiplatform.init(project="datapathdeployfastapi", location="us-central1")

In [37]:
job = aiplatform.PipelineJob(
    display_name="pipeline de limpieza",
    template_path="clean_input_data.json",#Ruta debe apuntar a gcs
    enable_caching=False,
    project="datapathdeployfastapi",
    location="us-central1",
    parameter_values={"project": "datapathdeployfastapi", 
                      "table_id": "datapathdeployfastapi.proyectoPred.xtestdata",
                      "path_csv": "gs://vertex-datapath/demo/data/pipeline-s6/xtest.csv",
                      "path_json": "gs://vertex-datapath/demo/schema/schema.json",
                     },
    labels={"module": "sec", "application": "app", "chapter": "mlops", "company": "datapath", "environment": "dev", "owner": "xxxx"}
)#esto debe estar en una Cloud Function para que pueda funcionar y no en vertex

print('submit pipeline job ...')
job.submit(service_account="dev-mlops@datapathdeployfastapi.iam.gserviceaccount.com")

submit pipeline job ...
Creating PipelineJob
PipelineJob created. Resource name: projects/848324577645/locations/us-central1/pipelineJobs/pipelineinsertbq-20241022220752
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/848324577645/locations/us-central1/pipelineJobs/pipelineinsertbq-20241022220752')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipelineinsertbq-20241022220752?project=848324577645
