In [17]:
# Install packages
import kfp
from google.cloud import aiplatform
from kfp.v2 import dsl, compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, component, importer)
from typing import NamedTuple
from google.cloud import storage
import pandas as pd
import gcsfs


In [18]:
PROJECT_ID = "datapath-deploy-api-v1-434102"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
BUCKET_URI = f"gs://demo_vertext_01"  # @param {type:"string"}
SERVICE_ACCOUNT = "dev-mlops-vertex@datapath-deploy-api-v1-434102.iam.gserviceaccount.com"

In [19]:
# ! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}
PIPELINE_ROOT = "{}/output_info".format(BUCKET_URI)

In [20]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

##### Preprocess data

In [33]:
# @component(base_image="python:3.9") #, packages_to_install=["pandas", "google-cloud-storage"])
@component(base_image="python:3.9", packages_to_install=["pandas", "google-cloud-storage", "gcsfs"])
def preprocess_data(
    input_csv_path: str,
    output_folder_path: str,
    output_csv_name: str,
#    gcs_bucket_name: str,
#    gcs_blob_name: str,
#    output_dataset_path: OutputPath("Dataset")
    
):
    """
    Preprocessing step that downloads a CSV from GCS, processes it, and outputs the result.
    """
    #from google.cloud import storage
    #import pandas as pd
    
    # Inicializa el cliente de GCS
    #client = storage.Client()
    #bucket = client.get_bucket(gcs_bucket_name)
    #blob = bucket.blob(gcs_blob_name)

    #local_csv_path = "/tmp/ISA_Historical_Info_2002_2024.csv"
    #blob.download_to_filename(local_csv_path)
    #print(f"Downloaded {gcs_blob_name} from bucket {gcs_bucket_name} to {local_csv_path}")
    
    # Lee el CSV usando pandas
    #df = pd.read_csv(local_csv_path)    

    # Escribe el dataset procesado como CSV en la salida
    #df.to_csv(output_dataset_path, index=False)
    #print(f"Processed dataset saved to {output_dataset_path}")
    
    # Crear un sistema de archivos GCS
    import pandas as pd
    import gcsfs
    
    fs = gcsfs.GCSFileSystem()

    # Leer el archivo CSV desde GCS
    with fs.open(input_csv_path) as f:
        df = pd.read_csv(f, sep=",")
        print(df.head())

    # Crear la ruta completa del archivo de salida
    output_csv_path = f"{output_folder_path}/{output_csv_name}"

    # Guardar el DataFrame procesado en un nuevo archivo CSV en GCS
    with fs.open(output_csv_path, 'w') as f:
        df.to_csv(f, index=False)
    
    print(f"Archivo procesado guardado en: {output_csv_path}")        
        

#### Definir un pipeline de test

In [34]:
@dsl.pipeline(
    name="test-pipeline-2",
    pipeline_root=PIPELINE_ROOT
)
def csv_preprocessing_pipeline(
    project: str = "datapath-deploy-api-v1-434102",
    gcp_region: str = "us-central1"):
    
    # Definimos una ruta de salida en el bucket de GCS para el dataset procesado
    # output_path = "gs://demo_vertext_01/output_info/processed_data.csv"

    preprocess_task = preprocess_data(
       input_csv_path="gs://demo_vertext_01/raw_info/ISA_Historical_Info_2002_2024.csv",
       output_folder_path = "gs://demo_vertext_01/output_info",
       output_csv_name = "processed_ISA_Historical_Info_2002_2024.csv"
        # output_dataset_path=output_path  # Este valor será gestionado automáticamente por KFP
    )


#### Compilar el pipeline de test:

In [35]:
compiler.Compiler().compile(
    pipeline_func = csv_preprocessing_pipeline,
    package_path = "pipeline_demo_test_2.json"
)



#### Run the pipeline

In [36]:
aiplatform.init(project = "datapath-deploy-api-v1-434102", location = "us-central1")

job = aiplatform.PipelineJob(
    display_name ="test-pipeline-job",
    template_path = "pipeline_demo_test_2.json",
    pipeline_root = PIPELINE_ROOT,
    enable_caching = False,
    project = "datapath-deploy-api-v1-434102",
    location = "us-central1"
    )

print("submit pipeline job ....")
job.submit("dev-mlops-vertex@datapath-deploy-api-v1-434102.iam.gserviceaccount.com")


submit pipeline job ....
Creating PipelineJob
PipelineJob created. Resource name: projects/172483762390/locations/us-central1/pipelineJobs/test-pipeline-2-20241015224752
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/172483762390/locations/us-central1/pipelineJobs/test-pipeline-2-20241015224752')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/test-pipeline-2-20241015224752?project=172483762390
